Skip to content

Commit

Permalink
FAB-15043 Mig-v1 cleanup #2
Browse files Browse the repository at this point in the history
In preparation to the alternative migration design (v2),
revert some of the commits already merged.

Kafka to Raft migration v1 - cleanup #2
Clean Kafka2Raft green path #4 (of v1)

Clean commit
0504983

Change-Id: Ic7274401fdc58870c997b3204c21f548c7f5d1af
Signed-off-by: Yoav Tock <tock@il.ibm.com>
  • Loading branch information
tock-ibm committed Jun 6, 2019
1 parent 01a7c89 commit eb88d26
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 148 deletions.
56 changes: 0 additions & 56 deletions orderer/common/multichannel/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/consensus"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -69,14 +68,6 @@ func newChainSupport(
// Set up the block writer
cs.BlockWriter = newBlockWriter(lastBlock, registrar, cs)

// TODO Identify recovery after crash in the middle of consensus-type migration
if cs.detectMigration(lastBlock) {
// We do this because the last block after migration (COMMIT/CONTEXT) carries Kafka metadata.
// This prevents the code down the line from unmarshaling it as Raft, and panicking.
metadata.Value = nil
logger.Debugf("[channel: %s] Consensus-type migration: restart on to Raft, resetting Kafka block metadata", cs.ChainID())
}

// Set up the consenter
consenterType := ledgerResources.SharedConfig().ConsensusType()
consenter, ok := consenters[consenterType]
Expand All @@ -94,53 +85,6 @@ func newChainSupport(
return cs
}

// detectMigration identifies restart after consensus-type migration was committed (green path).
// Restart after migration is detected by:
// 1. The Kafka2RaftMigration capability in on
// 2. The last block carries a config-tx
// 3. In the config-tx, you have:
// - (system-channel && state=COMMIT), OR
// - (standard-channel && state=CONTEXT)
// This assumes that migration was successful (green path). When migration ends successfully,
// every channel will have a config block as the last block. On the system channel, containing state=COMMIT;
// on standard channels, containing state=CONTEXT.
func (cs *ChainSupport) detectMigration(lastBlock *cb.Block) bool {
isMigration := false

if !cs.ledgerResources.SharedConfig().Capabilities().Kafka2RaftMigration() {
return isMigration
}

lastConfigIndex, err := utils.GetLastConfigIndexFromBlock(lastBlock)
if err != nil {
logger.Panicf("Chain did not have appropriately encoded last config in its latest block: %s", err)
}

logger.Debugf("[channel: %s], sysChan=%v, lastConfigIndex=%d, H=%d, mig-state: %s",
cs.ChainID(), cs.systemChannel, lastConfigIndex, cs.ledgerResources.Height(),
cs.ledgerResources.SharedConfig().ConsensusMigrationState())

if lastConfigIndex == lastBlock.Header.Number { //The last block was a config-tx
state := cs.ledgerResources.SharedConfig().ConsensusMigrationState()
if cs.systemChannel {
if state == orderer.ConsensusType_MIG_STATE_COMMIT {
isMigration = true
}
} else {
if state == orderer.ConsensusType_MIG_STATE_CONTEXT {
isMigration = true
}
}

if isMigration {
logger.Infof("[channel: %s], Restarting after consensus-type migration. New consensus-type is: %s",
cs.ChainID(), cs.ledgerResources.SharedConfig().ConsensusType())
}
}

return isMigration
}

// Block returns a block with the following number,
// or nil if such a block doesn't exist.
func (cs *ChainSupport) Block(number uint64) *cb.Block {
Expand Down
50 changes: 1 addition & 49 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,7 @@ func (c *Chain) Start() {
}

isJoin := c.support.Height() > 1
isMigration := false
if isJoin {
isMigration = c.detectMigration()
}
c.Node.start(c.fresh, isJoin, isMigration)
c.Node.start(c.fresh, isJoin)

close(c.startC)
close(c.errorC)
Expand All @@ -358,42 +354,6 @@ func (c *Chain) Start() {
c.periodicChecker.Run()
}

// detectMigration detects if the orderer restarts right after consensus-type migration,
// in which the Height>1 but previous blocks were created by Kafka.
// If this is the case, Raft should be started like it is joining a new channel.
func (c *Chain) detectMigration() bool {
startOfChain := false
if c.support.SharedConfig().Capabilities().Kafka2RaftMigration() {
lastConfigIndex, err := utils.GetLastConfigIndexFromBlock(c.lastBlock)
if err != nil {
c.logger.Panicf("Chain did not have appropriately encoded last config in its latest block: %s", err)
}

c.logger.Debugf("Detecting if consensus-type migration, sysChan=%v, lastConfigIndex=%d, Height=%d, mig-state: %s",
c.support.IsSystemChannel(), lastConfigIndex, c.lastBlock.Header.Number+1, c.support.SharedConfig().ConsensusMigrationState().String())

if lastConfigIndex != c.lastBlock.Header.Number { // The last block is not a config-tx
return startOfChain
}

// The last block was a config-tx
if c.support.IsSystemChannel() {
if c.support.SharedConfig().ConsensusMigrationState() == orderer.ConsensusType_MIG_STATE_COMMIT {
startOfChain = true
}
} else {
if c.support.SharedConfig().ConsensusMigrationState() == orderer.ConsensusType_MIG_STATE_CONTEXT {
startOfChain = true
}
}

if startOfChain {
c.logger.Infof("Restarting after consensus-type migration. Type: %s, just starting the channel.", c.support.SharedConfig().ConsensusType())
}
}
return startOfChain
}

// Order submits normal type transactions for ordering.
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
c.Metrics.NormalProposalsReceived.Add(1)
Expand Down Expand Up @@ -1329,14 +1289,6 @@ func (c *Chain) getInFlightConfChange() *raftpb.ConfChange {
return nil
}

// Detect if it is a restart right after consensus-type migration. If yes, return early in order to avoid using
// the block metadata as etcdraft.BlockMetadata (see below). Right after migration the block metadata will carry
// Kafka metadata. The etcdraft.BlockMetadata should be extracted from the ConsensusType.Metadata, instead.
if c.detectMigration() {
c.logger.Infof("Restarting after consensus-type migration. Type: %s, just starting the chain.", c.support.SharedConfig().ConsensusType())
return nil
}

// extracting current Raft configuration state
confState := c.Node.ApplyConfChange(raftpb.ConfChange{})

Expand Down
7 changes: 1 addition & 6 deletions orderer/consensus/etcdraft/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3500,12 +3500,7 @@ func newChain(timeout time.Duration, channel string, dataDir string, id uint64,

support := &consensusmocks.FakeConsenterSupport{}
support.ChainIDReturns(channel)
support.SharedConfigReturns(&mockconfig.Orderer{
BatchTimeoutVal: timeout,
CapabilitiesVal: &mockconfig.OrdererCapabilities{
Kafka2RaftMigVal: false,
},
})
support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})

cutter := mockblockcutter.NewReceiver()
close(cutter.Block)
Expand Down
17 changes: 0 additions & 17 deletions orderer/consensus/etcdraft/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package etcdraft

import (
"bytes"
"encoding/hex"
"path"
"reflect"
"time"
Expand Down Expand Up @@ -120,27 +119,11 @@ func (c *Consenter) detectSelfID(consenters map[uint64]*etcdraft.Consenter) (uin

// HandleChain returns a new Chain instance or an error upon failure
func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *common.Metadata) (consensus.Chain, error) {

if support.SharedConfig().Capabilities().Kafka2RaftMigration() {
c.Logger.Debugf("SharedConfig.ConsensusType fields: Type=%s, ConsensusMigrationState=%s, ConsensusMigrationContext=%d, ConsensusMetadata length=%d",
support.SharedConfig().ConsensusType(), support.SharedConfig().ConsensusMigrationState(),
support.SharedConfig().ConsensusMigrationContext(), len(support.SharedConfig().ConsensusMetadata()))
if support.SharedConfig().ConsensusMigrationState() != orderer.ConsensusType_MIG_STATE_NONE {
c.Logger.Debugf("SharedConfig.ConsensusType: ConsensusMetadata dump:\n%s", hex.Dump(support.SharedConfig().ConsensusMetadata()))
}
}

m := &etcdraft.ConfigMetadata{}
if err := proto.Unmarshal(support.SharedConfig().ConsensusMetadata(), m); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal consensus metadata")
}

if support.SharedConfig().Capabilities().Kafka2RaftMigration() &&
support.SharedConfig().ConsensusMigrationState() != orderer.ConsensusType_MIG_STATE_NONE {
c.Logger.Debugf("SharedConfig().ConsensusMetadata(): %s", m.String())
c.Logger.Debugf("block metadata.Value dump: \n%s", hex.Dump(metadata.Value))
}

if m.Options == nil {
return nil, errors.New("etcdraft options have not been provided")
}
Expand Down
15 changes: 3 additions & 12 deletions orderer/consensus/etcdraft/consenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,7 @@ var _ = Describe("Consenter", func() {
metadata := utils.MarshalOrPanic(m)
support.SharedConfigReturns(&mockconfig.Orderer{
ConsensusMetadataVal: metadata,
CapabilitiesVal: &mockconfig.OrdererCapabilities{
Kafka2RaftMigVal: false,
},
BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024},
BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024},
})

consenter := newConsenter(chainGetter)
Expand Down Expand Up @@ -199,10 +196,7 @@ var _ = Describe("Consenter", func() {
support := &consensusmocks.FakeConsenterSupport{}
support.SharedConfigReturns(&mockconfig.Orderer{
ConsensusMetadataVal: metadata,
CapabilitiesVal: &mockconfig.OrdererCapabilities{
Kafka2RaftMigVal: false,
},
BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024},
BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024},
})
support.ChainIDReturns("foo")

Expand All @@ -224,10 +218,7 @@ var _ = Describe("Consenter", func() {
metadata := utils.MarshalOrPanic(m)
support.SharedConfigReturns(&mockconfig.Orderer{
ConsensusMetadataVal: metadata,
CapabilitiesVal: &mockconfig.OrdererCapabilities{
Kafka2RaftMigVal: false,
},
BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024},
BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024},
})

consenter := newConsenter(chainGetter)
Expand Down
11 changes: 3 additions & 8 deletions orderer/consensus/etcdraft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,15 @@ type node struct {
raft.Node
}

func (n *node) start(fresh, join, migration bool) {
func (n *node) start(fresh, join bool) {
raftPeers := RaftPeers(n.metadata.ConsenterIds)
n.logger.Debugf("Starting raft node: #peers: %v", len(raftPeers))

var campaign bool
if fresh {
if join {
if !migration {
raftPeers = nil
n.logger.Info("Starting raft node to join an existing channel")

} else {
n.logger.Info("Starting raft node to join an existing channel, after consensus-type migration")
}
raftPeers = nil
n.logger.Info("Starting raft node to join an existing channel")
} else {
n.logger.Info("Starting raft node as part of a new channel")

Expand Down

0 comments on commit eb88d26

Please sign in to comment.