Skip to content

Commit

Permalink
Merge changes Ib734da20,I4c222852,I52a1283e,Ic7274401,I0eefdaed into …
Browse files Browse the repository at this point in the history
…release-1.4

* changes:
  FAB-15511 Kafka2Raft-V1 cleanup #5
  FAB-15045 Mig-v1 cleanup #4
  FAB-15044 Mig-v1 cleanup #3
  FAB-15043 Mig-v1 cleanup #2
  FAB-14700  Kafka2Raft validate broadcast
  • Loading branch information
Jason Yellick authored and Gerrit Code Review committed Jun 7, 2019
2 parents 5af00ce + 3fd414c commit 09e245b
Show file tree
Hide file tree
Showing 23 changed files with 139 additions and 1,986 deletions.
64 changes: 0 additions & 64 deletions orderer/common/broadcast/mock/channel_support_registrar.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 0 additions & 67 deletions orderer/common/multichannel/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/consensus"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
)
Expand All @@ -28,9 +27,6 @@ type ChainSupport struct {
consensus.Chain
cutter blockcutter.Receiver
crypto.LocalSigner
// Needed for consensus-type migration: to execute the migration state machine correctly,
// chains need to know if they are system or standard channel.
systemChannel bool
}

func newChainSupport(
Expand Down Expand Up @@ -60,23 +56,12 @@ func newChainSupport(
),
}

// When ConsortiumsConfig exists, it is the system channel
_, cs.systemChannel = ledgerResources.ConsortiumsConfig()

// Set up the msgprocessor
cs.Processor = msgprocessor.NewStandardChannel(cs, msgprocessor.CreateStandardChannelFilters(cs))

// Set up the block writer
cs.BlockWriter = newBlockWriter(lastBlock, registrar, cs)

// TODO Identify recovery after crash in the middle of consensus-type migration
if cs.detectMigration(lastBlock) {
// We do this because the last block after migration (COMMIT/CONTEXT) carries Kafka metadata.
// This prevents the code down the line from unmarshaling it as Raft, and panicking.
metadata.Value = nil
logger.Debugf("[channel: %s] Consensus-type migration: restart on to Raft, resetting Kafka block metadata", cs.ChainID())
}

// Set up the consenter
consenterType := ledgerResources.SharedConfig().ConsensusType()
consenter, ok := consenters[consenterType]
Expand All @@ -94,53 +79,6 @@ func newChainSupport(
return cs
}

// detectMigration identifies restart after consensus-type migration was committed (green path).
// Restart after migration is detected by:
// 1. The Kafka2RaftMigration capability in on
// 2. The last block carries a config-tx
// 3. In the config-tx, you have:
// - (system-channel && state=COMMIT), OR
// - (standard-channel && state=CONTEXT)
// This assumes that migration was successful (green path). When migration ends successfully,
// every channel will have a config block as the last block. On the system channel, containing state=COMMIT;
// on standard channels, containing state=CONTEXT.
func (cs *ChainSupport) detectMigration(lastBlock *cb.Block) bool {
isMigration := false

if !cs.ledgerResources.SharedConfig().Capabilities().Kafka2RaftMigration() {
return isMigration
}

lastConfigIndex, err := utils.GetLastConfigIndexFromBlock(lastBlock)
if err != nil {
logger.Panicf("Chain did not have appropriately encoded last config in its latest block: %s", err)
}

logger.Debugf("[channel: %s], sysChan=%v, lastConfigIndex=%d, H=%d, mig-state: %s",
cs.ChainID(), cs.systemChannel, lastConfigIndex, cs.ledgerResources.Height(),
cs.ledgerResources.SharedConfig().ConsensusMigrationState())

if lastConfigIndex == lastBlock.Header.Number { //The last block was a config-tx
state := cs.ledgerResources.SharedConfig().ConsensusMigrationState()
if cs.systemChannel {
if state == orderer.ConsensusType_MIG_STATE_COMMIT {
isMigration = true
}
} else {
if state == orderer.ConsensusType_MIG_STATE_CONTEXT {
isMigration = true
}
}

if isMigration {
logger.Infof("[channel: %s], Restarting after consensus-type migration. New consensus-type is: %s",
cs.ChainID(), cs.ledgerResources.SharedConfig().ConsensusType())
}
}

return isMigration
}

// Block returns a block with the following number,
// or nil if such a block doesn't exist.
func (cs *ChainSupport) Block(number uint64) *cb.Block {
Expand Down Expand Up @@ -239,8 +177,3 @@ func (cs *ChainSupport) VerifyBlockSignature(sd []*cb.SignedData, envelope *cb.C
}
return nil
}

// IsSystemChannel returns true if this is the system channel.
func (cs *ChainSupport) IsSystemChannel() bool {
return cs.systemChannel
}
70 changes: 0 additions & 70 deletions orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,6 @@ func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
r.consenters = consenters
existingChains := r.ledgerFactory.ChainIDs()

//TODO To initialize after consensus-type migration, it is necessary to identify the system channel and create it first,
// determining the correct consensus-type and the state of the migration. This is needed for recovery, in case the
// migration process crashes before it is committed.

for _, chainID := range existingChains {
rl, err := r.ledgerFactory.GetOrCreate(chainID)
if err != nil {
Expand Down Expand Up @@ -228,10 +224,6 @@ func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader
cs := r.GetChain(chdr.ChannelId)
// New channel creation
if cs == nil {
// Prevent channel creation during consensus-type migration
if r.ConsensusMigrationPending() {
return chdr, true, nil, errors.New("cannot create channel because consensus-type migration is pending")
}
cs = r.systemChannel
}

Expand All @@ -247,68 +239,6 @@ func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader
return chdr, isConfig, cs, nil
}

// ConsensusMigrationPending checks whether consensus-type migration is started on the system channel.
func (r *Registrar) ConsensusMigrationPending() bool {
//Note: systemChannel.MigrationStatus().IsPending() is thread safe, takes a mutex
return r.systemChannel.MigrationStatus().IsPending()
}

// ConsensusMigrationStart checks whether consensus-type migration had started,
// and then marks all standard channels as started.
func (r *Registrar) ConsensusMigrationStart(context uint64) error {
r.lock.Lock()
defer r.lock.Unlock()

for id, chain := range r.chains {
if id != r.systemChannel.ChainID() && chain.MigrationStatus().IsPending() {
return errors.Errorf("cannot start new consensus-type migration because standard channel %s, still pending", id)
}
}

for _, chain := range r.chains {
chain.MigrationStatus().SetStateContext(ab.ConsensusType_MIG_STATE_START, context)
}
logger.Debugf("Consensus-type migration: all standard channels marked as started, context=%d", context)

return nil
}

// ConsensusMigrationCommit checks pre-conditions and commits the consensus-type migration.
func (r *Registrar) ConsensusMigrationCommit() error {
r.lock.Lock()
defer r.lock.Unlock()

sysState, sysContext := r.systemChannel.MigrationStatus().StateContext()
if !(sysState == ab.ConsensusType_MIG_STATE_START && sysContext > 0) {
return errors.Errorf("cannot commit consensus-type migration because system channel (%s): state=%s, context=%d (expect: state=%s, context>0)",
r.systemChannel.ChainID(), sysState, sysContext, ab.ConsensusType_MIG_STATE_START)
}

for id, chain := range r.chains {
st, ctx := chain.MigrationStatus().StateContext()
if id == r.systemChannel.ChainID() {
continue
}
if st != ab.ConsensusType_MIG_STATE_CONTEXT {
return errors.Errorf("cannot commit consensus-type migration because standard channel %s, still pending, state=%s", id, st)
}
if ctx != sysContext {
return errors.Errorf("cannot commit consensus-type migration because standard channel %s, bad context=%d, expected=%d", id, ctx, sysContext)
}
}

r.systemChannel.MigrationStatus().SetStateContext(ab.ConsensusType_MIG_STATE_COMMIT, sysContext)
logger.Debugf("Consensus-type migration: system channel marked as committed, context=%d", sysContext)

return nil
}

// ConsensusMigrationAbort checks pre-conditions and aborts the consensus-type migration.
func (r *Registrar) ConsensusMigrationAbort() error {
//TODO implement the consensus-type migration abort path
return fmt.Errorf("Not implemented yet")
}

// GetChain retrieves the chain support for a chain if it exists.
func (r *Registrar) GetChain(chainID string) *ChainSupport {
r.lock.RLock()
Expand Down
Loading

0 comments on commit 09e245b

Please sign in to comment.