diff --git a/orderer/common/multichannel/registrar.go b/orderer/common/multichannel/registrar.go index 35d2a8b0fbc..18987aec33a 100644 --- a/orderer/common/multichannel/registrar.go +++ b/orderer/common/multichannel/registrar.go @@ -308,7 +308,7 @@ func (r *Registrar) ConsensusMigrationCommit() error { } // ConsensusMigrationAbort checks pre-conditions and aborts the consensus-type migration. -func (r *Registrar) ConsensusMigrationAbort() (err error) { +func (r *Registrar) ConsensusMigrationAbort() error { //TODO implement the consensus-type migration abort path return fmt.Errorf("Not implemented yet") } diff --git a/orderer/common/multichannel/util_test.go b/orderer/common/multichannel/util_test.go index b3296aae4bc..ccae0468811 100644 --- a/orderer/common/multichannel/util_test.go +++ b/orderer/common/multichannel/util_test.go @@ -30,7 +30,7 @@ func (mc *mockConsenter) HandleChain(support consensus.ConsenterSupport, metadat support: support, metadata: metadata, done: make(chan struct{}), - migrationStatus: migration.NewStatusStepper(support.IsSystemChannel(), support.ChainID()), + migrationStatus: migration.NewManager(support.IsSystemChannel(), support.ChainID()), }, nil } diff --git a/orderer/consensus/etcdraft/chain.go b/orderer/consensus/etcdraft/chain.go index dc07da4b15b..30d76bc1f84 100644 --- a/orderer/consensus/etcdraft/chain.go +++ b/orderer/consensus/etcdraft/chain.go @@ -267,7 +267,7 @@ func NewChain( }, logger: lg, opts: opts, - migrationStatus: migration.NewStatusStepper(support.IsSystemChannel(), support.ChainID()), // Needed by consensus-type migration + migrationStatus: migration.NewManager(support.IsSystemChannel(), support.ChainID()), // Needed by consensus-type migration } // DO NOT use Applied option in config, see https://github.com/etcd-io/etcd/issues/10217 diff --git a/orderer/consensus/inactive/inactive_chain.go b/orderer/consensus/inactive/inactive_chain.go index 78426675f00..3daee360d3c 100644 --- a/orderer/consensus/inactive/inactive_chain.go +++ b/orderer/consensus/inactive/inactive_chain.go @@ -45,5 +45,5 @@ func (c *Chain) Halt() { } func (c *Chain) MigrationStatus() migration.Status { - return &migration.StatusImpl{} + return migration.NewManager(false, "inactive") } diff --git a/orderer/consensus/kafka/chain.go b/orderer/consensus/kafka/chain.go index c8d17682ad4..33642a0b53f 100644 --- a/orderer/consensus/kafka/chain.go +++ b/orderer/consensus/kafka/chain.go @@ -79,7 +79,7 @@ func newChain( haltChan: make(chan struct{}), startChan: make(chan struct{}), doneReprocessingMsgInFlight: doneReprocessingMsgInFlight, - migrationStatusStepper: migration.NewStatusStepper(support.IsSystemChannel(), support.ChainID()), + migrationManager: migration.NewManager(support.IsSystemChannel(), support.ChainID()), }, nil } @@ -128,12 +128,12 @@ type chainImpl struct { // provides access to the consensus-type migration status of the chain, // and allows stepping through the state machine. - migrationStatusStepper migration.StatusStepper + migrationManager migration.Manager } // MigrationStatus provides access to the consensus-type migration status of the chain. func (chain *chainImpl) MigrationStatus() migration.Status { - return chain.migrationStatusStepper + return chain.migrationManager } // Errored returns a channel which will close when a partition consumer error @@ -226,8 +226,9 @@ func (chain *chainImpl) Order(env *cb.Envelope, configSeq uint64) error { } func (chain *chainImpl) order(env *cb.Envelope, configSeq uint64, originalOffset int64) error { - // During consensus-type migration: stop all normal txs on the system-channel and standard-channels. - if chain.migrationStatusStepper.IsPending() || chain.migrationStatusStepper.IsCommitted() { + // During consensus-type migration: stop all normal txs on the system-channel and standard-channels. This + // happens in the broadcast-phase, and will prevent new transactions from entering Kafka. + if chain.migrationManager.IsPending() || chain.migrationManager.IsCommitted() { return fmt.Errorf("[channel: %s] cannot enqueue, consensus-type migration pending", chain.ChainID()) } @@ -248,7 +249,7 @@ func (chain *chainImpl) Configure(config *cb.Envelope, configSeq uint64) error { func (chain *chainImpl) configure(config *cb.Envelope, configSeq uint64, originalOffset int64) error { // During consensus-type migration, stop channel creation - if chain.ConsenterSupport.IsSystemChannel() && chain.migrationStatusStepper.IsPending() { + if chain.ConsenterSupport.IsSystemChannel() && chain.migrationManager.IsPending() { ordererTx, err := isOrdererTx(config) if err != nil { err = errors.Wrap(err, "cannot determine if config-tx is of type ORDERER_TX, on system channel") @@ -272,7 +273,7 @@ func (chain *chainImpl) configure(config *cb.Envelope, configSeq uint64, origina return nil } -// enqueue accepts a message and returns true on acceptance, or false otheriwse. +// enqueue accepts a message and returns true on acceptance, or false otherwise. func (chain *chainImpl) enqueue(kafkaMsg *ab.KafkaMessage) bool { logger.Debugf("[channel: %s] Enqueueing envelope...", chain.ChainID()) select { @@ -855,8 +856,8 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r offset = chain.lastOriginalOffsetProcessed } - // During consensus-type migration, drop normal messages that managed to sneak in past Order, possibly from other orderers - if chain.migrationStatusStepper.IsPending() || chain.migrationStatusStepper.IsCommitted() { + // During consensus-type migration, drop normal messages on the channel. + if chain.migrationManager.IsPending() || chain.migrationManager.IsCommitted() { logger.Warningf("[channel: %s] Normal message is dropped, consensus-type migration pending", chain.ChainID()) return nil } @@ -935,7 +936,7 @@ func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, r if doCommit { commitConfigMsg(env, offset) } else { - logger.Infof("[channel: %s] Dropping config message with offset %d, because of consensus-type migration step", chain.ChainID(), receivedOffset) + logger.Warningf("[channel: %s] Dropping config message with offset %d, because of consensus-type migration step", chain.ChainID(), receivedOffset) } default: @@ -968,10 +969,10 @@ func (chain *chainImpl) processMigrationStep(configTx *cb.Envelope) (commitBlock switch chdr.Type { case int32(cb.HeaderType_ORDERER_TRANSACTION): - if chain.migrationStatusStepper.IsPending() || chain.migrationStatusStepper.IsCommitted() { + if chain.migrationManager.IsPending() || chain.migrationManager.IsCommitted() { commitBlock = false logger.Debugf("[channel: %s] Consensus-type migration: Dropping ORDERER_TRANSACTION because consensus-type migration pending; Status: %s", - chain.ChainID(), chain.migrationStatusStepper) + chain.ChainID(), chain.migrationManager) } else { commitBlock = true } @@ -999,8 +1000,8 @@ func (chain *chainImpl) processMigrationStep(configTx *cb.Envelope) (commitBlock logger.Infof("[channel: %s] Consensus-type migration: Processing config tx: type: %s, state: %s, context: %d", chain.ChainID(), nextConsensusType, nextMigState.String(), nextMigContext) - commitMigration := false // Prevent shadowing of commitBlock - commitBlock, commitMigration = chain.migrationStatusStepper.Step( // Evaluate the migration state machine + commitMigration := false // Prevent shadowing of commitBlock + commitBlock, commitMigration = chain.migrationManager.Step( // Evaluate the migration state machine chain.ChainID(), nextConsensusType, nextMigState, nextMigContext, chain.lastCutBlockNumber, chain.consenter.migrationController()) logger.Debugf("[channel: %s] Consensus-type migration: commitBlock=%v, commitMigration=%v", chain.ChainID(), commitBlock, commitMigration) @@ -1017,15 +1018,15 @@ func (chain *chainImpl) processMigrationStep(configTx *cb.Envelope) (commitBlock block := chain.CreateNextBlock([]*cb.Envelope{configTx}) replacer := file.NewReplacer(chain.consenter.bootstrapFile()) if err = replacer.ReplaceGenesisBlockFile(block); err != nil { - _, context := chain.migrationStatusStepper.StateContext() - chain.migrationStatusStepper.SetStateContext(ab.ConsensusType_MIG_STATE_START, context) //Undo the commit + _, context := chain.migrationManager.StateContext() + chain.migrationManager.SetStateContext(ab.ConsensusType_MIG_STATE_START, context) //Undo the commit logger.Warningf("[channel: %s] Consensus-type migration: Reject Config tx on system channel, cannot replace bootstrap file; Status: %s", - chain.ChainID(), chain.migrationStatusStepper.String()) + chain.ChainID(), chain.migrationManager.String()) return false, err } logger.Infof("[channel: %s] Consensus-type migration: committed; Replaced bootstrap file: %s; Status: %s", - chain.ChainID(), chain.consenter.bootstrapFile(), chain.migrationStatusStepper.String()) + chain.ChainID(), chain.consenter.bootstrapFile(), chain.migrationManager.String()) } default: diff --git a/orderer/consensus/kafka/chain_test.go b/orderer/consensus/kafka/chain_test.go index 03bb0b38022..f0eb48218b9 100644 --- a/orderer/consensus/kafka/chain_test.go +++ b/orderer/consensus/kafka/chain_test.go @@ -2581,7 +2581,7 @@ func TestResubmission(t *testing.T) { errorChan: errorChan, haltChan: haltChan, doneProcessingMessagesToBlocks: make(chan struct{}), - migrationStatusStepper: migration.NewStatusStepper(mockSupport.IsSystemChannel(), mockSupport.ChainID()), + migrationManager: migration.NewManager(mockSupport.IsSystemChannel(), mockSupport.ChainID()), } var counts []uint64 @@ -2771,7 +2771,7 @@ func TestResubmission(t *testing.T) { haltChan: haltChan, doneProcessingMessagesToBlocks: make(chan struct{}), doneReprocessingMsgInFlight: doneReprocessing, - migrationStatusStepper: migration.NewStatusStepper(mockSupport.IsSystemChannel(), mockSupport.ChainID()), + migrationManager: migration.NewManager(mockSupport.IsSystemChannel(), mockSupport.ChainID()), } var counts []uint64 diff --git a/orderer/consensus/migration/migration.go b/orderer/consensus/migration/migration.go index 7fe55b835ce..d06068f389c 100644 --- a/orderer/consensus/migration/migration.go +++ b/orderer/consensus/migration/migration.go @@ -33,11 +33,27 @@ type Status interface { IsCommitted() bool } -// Stepper allows the underlying chain to execute the migration state machine. -type Stepper interface { - // Step evaluates the migration state machine of a particular chain. It returns whether the block should be - // committed to the ledger or dropped (commitBlock), and whether the bootstrap file (a.k.a. genesis block) - // should be replaced (commitMigration). +// ConsensusTypeInfo carries the fields of protos/orderer/ConsensusType that are contained in a proposed or ordered +// config update transaction. +type ConsensusTypeInfo struct { + Type string + Metadata []byte + State orderer.ConsensusType_MigrationState + Context uint64 +} + +// Manager is in charge of exposing the Status of the migration, providing methods for validating and filtering +// incoming config updates (before ordering), and allowing the underlying chain to execute the migration state machine +// in response to ordered config updates and signals from the migration controller. +type Manager interface { + Status + + // Step evaluates whether a config update is allowed to be committed. It is called when a config transaction is + // consumed from Kafka, i.e. after ordering. It returns whether the block that contains said transaction should be + // committed to the ledger or dropped (commitConfigTx), and whether the bootstrap file (a.k.a. genesis block) + // should be replaced (replaceBootstrapFile). Step may change the status of the underlying chain, and in case of + // the system chain, it may also change the status of standard chains, via its interaction with the + // migrationController (Registrar). Step( chainID string, nextConsensusType string, @@ -45,171 +61,187 @@ type Stepper interface { nextMigContext uint64, lastCutBlockNumber uint64, migrationController Controller, - ) (commitBlock bool, commitMigration bool) -} + ) (commitConfigTx bool, replaceBootstrapFile bool) -// StatusStepper is a composition of the Status and Stepper interfaces. -type StatusStepper interface { - Status - Stepper + // CheckAllowed evaluates whether a config update is allowed to be enqueued for ordering by checking against the chain's + // status and migrationController. It is called during the broadcast phase and never changes the status of the + // underlying chain. + CheckAllowed(next *ConsensusTypeInfo, migrationController Controller) error + + // Validate checks the validity of the state transitions of a possible migration config update tx by comparing the + // current ConsensusTypeInfo config with the next (proposed) ConsensusTypeInfo. It is called during the broadcast + // phase and never changes the status of the underlying chain. + Validate(current, next *ConsensusTypeInfo) error } -//go:generate counterfeiter -o mocks/consensus_migration_controller.go . Controller +//go:generate counterfeiter -o ../mocks/consensus_migration_controller.go . Controller // Controller defines methods for controlling and coordinating the process of consensus-type migration. -// It is implemented by the Registrar and is used by the system and standard chains. +// It is implemented by the multichannel.Registrar and is used by the system and standard chains. type Controller interface { // ConsensusMigrationPending checks whether consensus-type migration had started, // by inspecting the status of the system channel. ConsensusMigrationPending() bool // ConsensusMigrationStart marks every standard channel as "START" with the given context. - // It should first check that consensus-type migration is not pending on any of the standard channels. + // It should first check that consensus-type migration is not started on any of the standard channels. // This call is always triggered by a MigrationState="START" config update on the system channel. - // The context is the height of the system channel config block that carries said config update. - ConsensusMigrationStart(context uint64) (err error) + // The context is the future height of the system channel config block that carries said config update. + ConsensusMigrationStart(context uint64) error // ConsensusMigrationCommit verifies that the conditions for committing the consensus-type migration // are met, and if so, marks the system channel as committed. // The conditions are: - // 1. system channel mast be at START with context >0; - // 2. all standard channels must be at START with the same context as the system channel. - ConsensusMigrationCommit() (err error) + // 1. system channel must be at START with context >0; + // 2. all standard channels must be at CONTEXT with the same context as the system channel. + ConsensusMigrationCommit() error // ConsensusMigrationAbort verifies that the conditions for aborting the consensus-type migration - // are met, and if so, marks the system channel as aborted. - // The conditions are: - // 1. system channel mast be at START - // 2. all standard channels must be at START or CONTEXT - ConsensusMigrationAbort() (err error) + // are met, and if so, marks the system channel as aborted. These conditions are: + // 1. system channel must be at START; + // 2. all standard channels must be at ABORT. + ConsensusMigrationAbort() error } -// StatusImpl is an implementation of the StatusStepper interface, -// which provides access to the consensus-type migration status of the underlying chain. -// The methods that accept objects of this type are thread-safe. -type StatusImpl struct { - // mutex protects state and context. +// statusManager is an implementation of the Manager interface. +// The exported methods that accept objects of this type are thread-safe. +type statusManager struct { + // mutex protects state, context, and startCond. mutex sync.Mutex // state must be accessed with mutex locked. state orderer.ConsensusType_MigrationState // context must be accessed with mutex locked. context uint64 + // startCond is used to wait for the start signal //TODO not used yet + startCond *sync.Cond // systemChannel does not need to be protected by mutex since it is immutable after creation. systemChannel bool + // logger is decorated with the chainID logger *flogging.FabricLogger } -// NewStatusStepper generates a new StatusStepper implementation. -func NewStatusStepper(sysChan bool, chainID string) StatusStepper { - return &StatusImpl{ +// NewManager generates a new Manager implementation. +func NewManager(sysChan bool, chainID string) Manager { + m := &statusManager{ systemChannel: sysChan, logger: flogging.MustGetLogger("orderer.consensus.migration").With("channel", chainID), } + m.startCond = sync.NewCond(&m.mutex) + return m } // StateContext returns the consensus-type migration state and context. -func (ms *StatusImpl) StateContext() (state orderer.ConsensusType_MigrationState, context uint64) { - ms.mutex.Lock() - defer ms.mutex.Unlock() - return ms.state, ms.context +func (sm *statusManager) StateContext() (state orderer.ConsensusType_MigrationState, context uint64) { + sm.mutex.Lock() + defer sm.mutex.Unlock() + return sm.state, sm.context } // SetStateContext sets the consensus-type migration state and context. -func (ms *StatusImpl) SetStateContext(state orderer.ConsensusType_MigrationState, context uint64) { - ms.mutex.Lock() - defer ms.mutex.Unlock() - ms.state = state - ms.context = context +func (sm *statusManager) SetStateContext(state orderer.ConsensusType_MigrationState, context uint64) { + sm.mutex.Lock() + defer sm.mutex.Unlock() + sm.state = state + sm.context = context } // IsPending returns true if migration is pending. -func (ms *StatusImpl) IsPending() bool { - ms.mutex.Lock() - defer ms.mutex.Unlock() +func (sm *statusManager) IsPending() bool { + sm.mutex.Lock() + defer sm.mutex.Unlock() - if ms.systemChannel { - return ms.state == orderer.ConsensusType_MIG_STATE_START + if sm.systemChannel { + return sm.state == orderer.ConsensusType_MIG_STATE_START } - return ms.state == orderer.ConsensusType_MIG_STATE_START || ms.state == orderer.ConsensusType_MIG_STATE_CONTEXT + return sm.state == orderer.ConsensusType_MIG_STATE_START || sm.state == orderer.ConsensusType_MIG_STATE_CONTEXT } // IsCommitted returns true if migration is committed. -func (ms *StatusImpl) IsCommitted() bool { - ms.mutex.Lock() - defer ms.mutex.Unlock() +func (sm *statusManager) IsCommitted() bool { + sm.mutex.Lock() + defer sm.mutex.Unlock() - if ms.systemChannel { - return ms.state == orderer.ConsensusType_MIG_STATE_COMMIT + if sm.systemChannel { + return sm.state == orderer.ConsensusType_MIG_STATE_COMMIT } return false } +// Call this only inside a lock. +func (sm *statusManager) string() string { + return fmt.Sprintf("State=%s, Context=%d, Sys=%t", sm.state, sm.context, sm.systemChannel) +} + // String returns a text representation. -func (ms *StatusImpl) String() string { - ms.mutex.Lock() - defer ms.mutex.Unlock() +// Never call this inside a lock; mutex is not recurrent. +func (sm *statusManager) String() string { + sm.mutex.Lock() + defer sm.mutex.Unlock() - return fmt.Sprintf("State=%s, Context=%d, Sys=%t", ms.state, ms.context, ms.systemChannel) + return sm.string() } -// Step evaluates the migration state machine of a particular chain. It returns whether -// the block should be committed to the ledger or dropped (commitBlock), and whether the bootstrap file -// (a.k.a. genesis block) should be replaced (commitMigration). +// Step receives as input the ConsensusType fields of an incoming ordered (i.e., consumed from Kafka) config-tx, +// and evaluates the migration state machine of a particular chain. It returns whether the config-tx should be +// committed to the ledger or dropped (commitConfigTx), and whether the bootstrap file (a.k.a. genesis block) should +// be replaced (replaceBootstrapFile). // -// When we get a message, we check whether it is a permitted transition of the state machine, and whether the -// parameters are correct. If it is a valid transition, we return commitBlock=true, which will cause the caller to -// commit the block to the ledger. +// When we process said fields of a config-tx, we check whether it is a permitted transition of the state machine, and +// whether the parameters are correct. If it is a valid transition, we return commitConfigTx=true, which will cause +// the caller to commit the respective config block to the ledger. // -// When we get a message that is a COMMIT, which is the final step of migration (this can only happen on the system -// channel), we also return commitMigration=true, which will cause the caller to replace the bootstrap file +// When we process a config-tx that is a COMMIT, which is the final step of migration (this can only happen on the +// system channel), we also return replaceBootstrapFile=true, which will cause the caller to replace the bootstrap file // (genesis block), as well as commit the block to the ledger. -// -// Note: the method may call the multichannel.Registrar (migrationController). The Registrar takes a mutex, and then -// calls individual migration.Status objects (.i.e. the lock of the migration.Status mutex is nested within the lock of -// Registrar mutex). In order to avoid deadlocks, here we only call the Registrar (migrationController) when the -// internal mutex in NOT taken. -func (ms *StatusImpl) Step( +func (sm *statusManager) Step( chainID string, nextConsensusType string, nextMigState orderer.ConsensusType_MigrationState, nextMigContext uint64, lastCutBlockNumber uint64, migrationController Controller, -) (commitBlock bool, commitMigration bool) { +) (commitConfigTx bool, replaceBootstrapFile bool) { - ms.logger.Debugf("Consensus-type migration: Config tx; Current status: %s; Input TX: Type=%s, State=%s, Ctx=%d; lastBlock=%d", - ms, nextConsensusType, nextMigState, nextMigContext, lastCutBlockNumber) + sm.logger.Debugf("Consensus-type migration: Config tx; Current status: %s; Input TX: Type=%s, State=%s, Ctx=%d; lastBlock=%d", + sm, nextConsensusType, nextMigState, nextMigContext, lastCutBlockNumber) - if ms.systemChannel { - commitBlock, commitMigration = ms.stepSystem( + if sm.systemChannel { + commitConfigTx, replaceBootstrapFile = sm.stepSystem( nextConsensusType, nextMigState, nextMigContext, lastCutBlockNumber, migrationController) } else { - commitBlock = ms.stepStandard( + commitConfigTx = sm.stepStandard( nextConsensusType, nextMigState, nextMigContext, migrationController) } - return commitBlock, commitMigration + return commitConfigTx, replaceBootstrapFile } -func (ms *StatusImpl) stepSystem( +// stepSystem executes the state machine on the system channel. +// +// Note: the method may call the multichannel.Registrar (migrationController). The Registrar takes a mutex, and then +// calls individual migration.Manager objects (i.e. the lock of the migration.Manager mutex is nested within the lock of +// Registrar mutex). In order to avoid deadlocks, here we only call the Registrar (migrationController) when the +// internal mutex in NOT taken. The migrationController calls migration.Manager.NotifyStart/Commit/Abort to change +// the status of individual migration.Manager objects. +func (sm *statusManager) stepSystem( nextConsensusType string, nextMigState orderer.ConsensusType_MigrationState, nextMigContext uint64, lastCutBlockNumber uint64, migrationController Controller, -) (commitBlock bool, commitMigration bool) { +) (commitConfigTx bool, replaceBootstrapFile bool) { unexpectedTransitionResponse := func(from, to orderer.ConsensusType_MigrationState) { - ms.logger.Debugf("Consensus-type migration: Dropping config tx because: unexpected consensus-type migration state transition: %s to %s", from, to) - commitBlock = false - commitMigration = false + sm.logger.Debugf("Consensus-type migration: Dropping config tx because: unexpected consensus-type migration state transition: %s to %s", from, to) + commitConfigTx = false + replaceBootstrapFile = false } - currState, currContext := ms.StateContext() + currState, currContext := sm.StateContext() switch currState { case orderer.ConsensusType_MIG_STATE_START: @@ -219,17 +251,17 @@ func (ms *StatusImpl) stepSystem( if currContext == nextMigContext { err := migrationController.ConsensusMigrationCommit() if err != nil { - ms.logger.Warningf("Consensus-type migration: Reject Config tx on system channel, migrationCommit failed; error=%s", err) + sm.logger.Warningf("Consensus-type migration: Reject Config tx on system channel, migrationCommit failed; error=%s", err) } else { - commitBlock = true - commitMigration = true + commitConfigTx = true + replaceBootstrapFile = true } } else { - ms.logger.Warningf("Consensus-type migration: Reject Config tx on system channel; %s to %s, because of bad context:(tx=%d/exp=%d)", + sm.logger.Warningf("Consensus-type migration: Reject Config tx on system channel; %s to %s, because of bad context:(tx=%d/exp=%d)", currState, nextMigState, nextMigContext, currContext) } case orderer.ConsensusType_MIG_STATE_ABORT: - ms.logger.Panicf("Consensus-type migration: Not implemented yet, transition %s to %s", currState, nextMigState) + sm.logger.Panicf("Consensus-type migration: Not implemented yet, transition %s to %s", currState, nextMigState) //TODO implement abort path default: unexpectedTransitionResponse(currState, nextMigState) @@ -237,7 +269,7 @@ func (ms *StatusImpl) stepSystem( case orderer.ConsensusType_MIG_STATE_COMMIT: //=== Migration committed, nothing left to do === - ms.logger.Debug("Consensus-type migration: Config tx on system channel, migration already committed, nothing left to do, dropping;") + sm.logger.Debug("Consensus-type migration: Config tx on system channel, migration already committed, nothing left to do, dropping;") case orderer.ConsensusType_MIG_STATE_ABORT, orderer.ConsensusType_MIG_STATE_NONE: //=== Migration is NOT pending, expect NONE or START === @@ -245,38 +277,39 @@ func (ms *StatusImpl) stepSystem( case orderer.ConsensusType_MIG_STATE_START: err := migrationController.ConsensusMigrationStart(lastCutBlockNumber + 1) if err != nil { - ms.logger.Warningf("Consensus-type migration: Reject Config tx on system channel, migrationStart failed; error=%s", err) + sm.logger.Warningf("Consensus-type migration: Reject Config tx on system channel, migrationStart failed; error=%s", err) } else { - ms.logger.Infof("Consensus-type migration: started; Status: %s", ms) - commitBlock = true + sm.logger.Infof("Consensus-type migration: started; Status: %s", sm) + commitConfigTx = true } case orderer.ConsensusType_MIG_STATE_NONE: - commitBlock = true + commitConfigTx = true default: unexpectedTransitionResponse(currState, nextMigState) } default: - ms.logger.Panicf("Consensus-type migration: Unexpected status, probably a bug; Current: %s; Input TX: State=%s, Context=%d, nextConsensusType=%s", - ms, nextMigState, nextMigContext, nextConsensusType) + sm.logger.Panicf("Consensus-type migration: Unexpected status, probably a bug; Current: %s; Input TX: State=%s, Context=%d, nextConsensusType=%s", + sm, nextMigState, nextMigContext, nextConsensusType) } - return commitBlock, commitMigration + return commitConfigTx, replaceBootstrapFile } -func (ms *StatusImpl) stepStandard( +// stepStandard valuates the next ordered config update on a standard channel +func (sm *statusManager) stepStandard( nextConsensusType string, nextMigState orderer.ConsensusType_MigrationState, nextMigContext uint64, migrationController Controller, -) (commitBlock bool) { +) (commitConfigTx bool) { unexpectedTransitionResponse := func(from, to orderer.ConsensusType_MigrationState) { - ms.logger.Debugf("Consensus-type migration: Dropping config tx because: unexpected consensus-type migration state transition: %s to %s", from, to) - commitBlock = false + sm.logger.Debugf("Consensus-type migration: Dropping config tx because: unexpected consensus-type migration state transition: %s to %s", from, to) + commitConfigTx = false } - currState, currContext := ms.StateContext() + currState, currContext := sm.StateContext() switch currState { case orderer.ConsensusType_MIG_STATE_START: @@ -285,15 +318,15 @@ func (ms *StatusImpl) stepStandard( case orderer.ConsensusType_MIG_STATE_CONTEXT: if migrationController.ConsensusMigrationPending() && //On the system channel (nextMigContext == currContext) { - ms.SetStateContext(nextMigState, nextMigContext) - ms.logger.Infof("Consensus-type migration: context accepted; Status: %s", ms) - commitBlock = true + sm.SetStateContext(nextMigState, nextMigContext) + sm.logger.Infof("Consensus-type migration: context accepted; Status: %s", sm) + commitConfigTx = true } else { - ms.logger.Warningf("Consensus-type migration: context rejected; migrationPending=%v, context:(tx=%d/exp=%d)", + sm.logger.Warningf("Consensus-type migration: context rejected; migrationPending=%v, context:(tx=%d/exp=%d)", migrationController.ConsensusMigrationPending(), nextMigContext, currContext) } case orderer.ConsensusType_MIG_STATE_ABORT: - ms.logger.Panicf("Consensus-type migration: Not implemented yet, transition %s to %s", currState, nextMigState) + sm.logger.Panicf("Consensus-type migration: Not implemented yet, transition %s to %s", currState, nextMigState) //TODO implement abort path default: unexpectedTransitionResponse(currState, nextMigState) @@ -303,7 +336,7 @@ func (ms *StatusImpl) stepStandard( //=== Migration not started or aborted, expect NONE (START is set by system channel, not message) switch nextMigState { case orderer.ConsensusType_MIG_STATE_NONE: - commitBlock = true + commitConfigTx = true default: unexpectedTransitionResponse(currState, nextMigState) } @@ -312,16 +345,39 @@ func (ms *StatusImpl) stepStandard( //=== Migration pending, expect ABORT, or nothing else to do (restart to Raft) switch nextMigState { case orderer.ConsensusType_MIG_STATE_ABORT: - ms.logger.Panicf("Consensus-type migration: Not implemented yet, transition %s to %s", currState, nextMigState) + sm.logger.Panicf("Consensus-type migration: Not implemented yet, transition %s to %s", currState, nextMigState) //TODO implement abort path default: unexpectedTransitionResponse(currState, nextMigState) } default: - ms.logger.Panicf("Consensus-type migration: Unexpected status, probably a bug; Current: %s; Input TX: State=%s, Context=%d, nextConsensusType=%s", - ms, nextMigState, nextMigContext, nextConsensusType) + sm.logger.Panicf("Consensus-type migration: Unexpected status, probably a bug; Current: %s; Input TX: State=%s, Context=%d, nextConsensusType=%s", + sm, nextMigState, nextMigContext, nextConsensusType) } - return commitBlock + return commitConfigTx +} + +// CheckAllowed evaluates whether a config update is allowed to be enqueued for ordering. It is called during the +// broadcast phase and never changes the status of the underlying chain. The system channel consults with the +// migrationController on whether the global state of the standard channels satisfies the conditions for the config +// update to advance. The standard channels consult with their local status on whether the conditions are satisfied +// for the config update to advance. +func (sm *statusManager) CheckAllowed(next *ConsensusTypeInfo, migrationController Controller) error { + //TODO + return nil +} + +// VerifyRaftMetadata performs some rudimentary validity checks on the Raft metadata object. +func (sm *statusManager) VerifyRaftMetadata(metadataBytes []byte) error { + //TODO + return nil +} + +// Validate checks the validity of the state transitions of a possible migration config update tx by comparing the +// current ConsensusTypeInfo config with the next (proposed) ConsensusTypeInfo. +func (sm *statusManager) Validate(current, next *ConsensusTypeInfo) error { + //TODO + return nil } diff --git a/orderer/consensus/migration/migration_test.go b/orderer/consensus/migration/migration_test.go index 2b9bd2ea7d5..6526c4e2e0b 100644 --- a/orderer/consensus/migration/migration_test.go +++ b/orderer/consensus/migration/migration_test.go @@ -17,14 +17,14 @@ func TestStateContextSystem(t *testing.T) { sysChan := true t.Run("Get", func(t *testing.T) { - status := migration.NewStatusStepper(sysChan, "test") + status := migration.NewManager(sysChan, "test") state, context := status.StateContext() assert.Equal(t, orderer.ConsensusType_MIG_STATE_NONE, state, "Must be initialized to %s", orderer.ConsensusType_MIG_STATE_NONE) assert.Equal(t, uint64(0), context, "Must be initialized to 0") }) t.Run("Green", func(t *testing.T) { - status := migration.NewStatusStepper(sysChan, "test") + status := migration.NewManager(sysChan, "test") t.Logf("status: %s", status.String()) status.SetStateContext(orderer.ConsensusType_MIG_STATE_START, 2) assert.True(t, status.IsPending()) @@ -38,7 +38,7 @@ func TestStateContextSystem(t *testing.T) { }) t.Run("Abort", func(t *testing.T) { - status := migration.NewStatusStepper(sysChan, "test") + status := migration.NewManager(sysChan, "test") t.Logf("status: %s", status.String()) status.SetStateContext(orderer.ConsensusType_MIG_STATE_START, 2) assert.True(t, status.IsPending()) @@ -56,14 +56,14 @@ func TestStateContextStandard(t *testing.T) { sysChan := false t.Run("Get", func(t *testing.T) { - status := migration.NewStatusStepper(sysChan, "test") + status := migration.NewManager(sysChan, "test") state, context := status.StateContext() assert.Equal(t, orderer.ConsensusType_MIG_STATE_NONE, state, "Must be initialized to %s", orderer.ConsensusType_MIG_STATE_NONE) assert.Equal(t, uint64(0), context, "Must be initialized to 0") }) t.Run("Green", func(t *testing.T) { - status := migration.NewStatusStepper(sysChan, "test") + status := migration.NewManager(sysChan, "test") t.Logf("status: %s", status.String()) status.SetStateContext(orderer.ConsensusType_MIG_STATE_START, 2) assert.True(t, status.IsPending()) @@ -77,7 +77,7 @@ func TestStateContextStandard(t *testing.T) { }) t.Run("Abort", func(t *testing.T) { - status := migration.NewStatusStepper(sysChan, "test") + status := migration.NewManager(sysChan, "test") t.Logf("status: %s", status.String()) status.SetStateContext(orderer.ConsensusType_MIG_STATE_START, 2) assert.True(t, status.IsPending()) @@ -91,7 +91,7 @@ func TestStateContextStandard(t *testing.T) { func TestStepSysFromNone(t *testing.T) { sysChan := true migController := mocks.FakeMigrationController{} - status := migration.NewStatusStepper(sysChan, "test") + status := migration.NewManager(sysChan, "test") t.Run("None-None", func(t *testing.T) { t.Logf("status before: %s", status.String()) @@ -145,7 +145,7 @@ func TestStepSysFromNone(t *testing.T) { func TestStepSysFromStart(t *testing.T) { sysChan := true migController := mocks.FakeMigrationController{} - status := migration.NewStatusStepper(sysChan, "test") + status := migration.NewManager(sysChan, "test") lastBlockCut := uint64(6) context := lastBlockCut + 1 status.SetStateContext(orderer.ConsensusType_MIG_STATE_START, context) @@ -206,7 +206,7 @@ func TestStepSysFromStart(t *testing.T) { func TestStepSysFromCommit(t *testing.T) { sysChan := true migController := mocks.FakeMigrationController{} - status := migration.NewStatusStepper(sysChan, "test") + status := migration.NewManager(sysChan, "test") lastBlockCut := uint64(6) context := lastBlockCut + 1 status.SetStateContext(orderer.ConsensusType_MIG_STATE_COMMIT, context) @@ -266,7 +266,7 @@ func TestStepSysFromAbort(t *testing.T) { func TestStepSysFromContext(t *testing.T) { sysChan := true migController := mocks.FakeMigrationController{} - status := migration.NewStatusStepper(sysChan, "test") + status := migration.NewManager(sysChan, "test") lastBlockCut := uint64(6) context := lastBlockCut + 1 status.SetStateContext(orderer.ConsensusType_MIG_STATE_CONTEXT, context) @@ -301,7 +301,7 @@ func TestStepSysFromContext(t *testing.T) { func TestStepStdFromNone(t *testing.T) { sysChan := false migController := mocks.FakeMigrationController{} - status := migration.NewStatusStepper(sysChan, "test") + status := migration.NewManager(sysChan, "test") t.Run("None-None", func(t *testing.T) { t.Logf("status before: %s", status.String()) @@ -343,7 +343,7 @@ func TestStepStdFromNone(t *testing.T) { func TestStepStdFromStart(t *testing.T) { sysChan := false migController := mocks.FakeMigrationController{} - status := migration.NewStatusStepper(sysChan, "test") + status := migration.NewManager(sysChan, "test") lastBlockCut := uint64(6) context := lastBlockCut + 1 status.SetStateContext(orderer.ConsensusType_MIG_STATE_START, context) @@ -392,7 +392,7 @@ func TestStepStdFromStart(t *testing.T) { func TestStepStdFromContext(t *testing.T) { sysChan := false migController := mocks.FakeMigrationController{} - status := migration.NewStatusStepper(sysChan, "test") + status := migration.NewManager(sysChan, "test") lastBlockCut := uint64(6) context := lastBlockCut + 1 status.SetStateContext(orderer.ConsensusType_MIG_STATE_CONTEXT, context) @@ -434,7 +434,7 @@ func TestStepStdFromContext(t *testing.T) { func TestStepStdFromCommit(t *testing.T) { sysChan := false migController := mocks.FakeMigrationController{} - status := migration.NewStatusStepper(sysChan, "test") + status := migration.NewManager(sysChan, "test") lastBlockCut := uint64(6) context := lastBlockCut + 1 diff --git a/orderer/consensus/solo/consensus.go b/orderer/consensus/solo/consensus.go index 4e8499fa954..b1b0832fc8b 100644 --- a/orderer/consensus/solo/consensus.go +++ b/orderer/consensus/solo/consensus.go @@ -50,7 +50,7 @@ func newChain(support consensus.ConsenterSupport) *chain { support: support, sendChan: make(chan *message), exitChan: make(chan struct{}), - migrationStatus: migration.NewStatusStepper(support.IsSystemChannel(), support.ChainID()), + migrationStatus: migration.NewManager(support.IsSystemChannel(), support.ChainID()), } }