From 2d924d0d8a7e6da24b873a121fe13e360afcce9f Mon Sep 17 00:00:00 2001 From: Yoav Tock Date: Tue, 19 Mar 2019 00:50:48 +0200 Subject: [PATCH] FAB-14700 Kafka2Raft validate broadcast Add methods to validate a config update in broadcast phase. Validate a proposed config update versus the current config. Checks the validity of the state transitions of a possible migration config update tx by comparing the current ConsensusType config with the next (proposed) ConsensusType fields. It is called during the broadcast phase and never changes the status of the underlying chain. Change-Id: Ib9b7d75f703d7f7f5eb0c3d514e2bfd4073f34f9 Signed-off-by: Yoav Tock --- orderer/consensus/consensus.go | 3 + orderer/consensus/kafka/chain.go | 120 ++++- orderer/consensus/kafka/chain_test.go | 23 +- orderer/consensus/kafka/consenter_test.go | 8 +- orderer/consensus/migration/migration.go | 18 +- orderer/consensus/migration/migration_test.go | 51 +- orderer/consensus/migration/validation.go | 176 +++++++ .../consensus/migration/validation_test.go | 450 ++++++++++++++++++ .../mocks/consensus_migration_controller.go | 50 +- .../consensus/mocks/mock_consenter_support.go | 64 +++ .../mocks/common/multichannel/multichannel.go | 8 + 11 files changed, 903 insertions(+), 68 deletions(-) create mode 100644 orderer/consensus/migration/validation.go create mode 100644 orderer/consensus/migration/validation_test.go diff --git a/orderer/consensus/consensus.go b/orderer/consensus/consensus.go index 86919ffd1d1..36aa0d70759 100644 --- a/orderer/consensus/consensus.go +++ b/orderer/consensus/consensus.go @@ -89,6 +89,9 @@ type ConsenterSupport interface { // SharedConfig provides the shared config from the channel's current config block. SharedConfig() channelconfig.Orderer + // ChannelConfig provides the channel config from the channel's current config block. + ChannelConfig() channelconfig.Channel + // CreateNextBlock takes a list of messages and creates the next block based on the block with highest block number committed to the ledger // Note that either WriteBlock or WriteConfigBlock must be called before invoking this method a second time. CreateNextBlock(messages []*cb.Envelope) *cb.Block diff --git a/orderer/consensus/kafka/chain.go b/orderer/consensus/kafka/chain.go index 33642a0b53f..bbe184f24b7 100644 --- a/orderer/consensus/kafka/chain.go +++ b/orderer/consensus/kafka/chain.go @@ -248,19 +248,8 @@ func (chain *chainImpl) Configure(config *cb.Envelope, configSeq uint64) error { } func (chain *chainImpl) configure(config *cb.Envelope, configSeq uint64, originalOffset int64) error { - // During consensus-type migration, stop channel creation - if chain.ConsenterSupport.IsSystemChannel() && chain.migrationManager.IsPending() { - ordererTx, err := isOrdererTx(config) - if err != nil { - err = errors.Wrap(err, "cannot determine if config-tx is of type ORDERER_TX, on system channel") - logger.Warning(err) - return err - } - if ordererTx { - str := "cannot enqueue, consensus-type migration pending: ORDERER_TX on system channel, blocking channel creation" - logger.Info(str) - return errors.Errorf(str) - } + if err := chain.filterMigration(config); err != nil { + return errors.Wrap(err, "cannot enqueue") } marshaledConfig, err := protoutil.Marshal(config) @@ -273,6 +262,57 @@ func (chain *chainImpl) configure(config *cb.Envelope, configSeq uint64, origina return nil } +// filterMigration filters out config transactions that should be rejected during consensus-type migration. +// Filtering takes into account the global state of migration, not just the state of the current channel. +func (chain *chainImpl) filterMigration(config *cb.Envelope) error { + migEnabled := chain.ChannelConfig().Capabilities().ConsensusTypeMigration() && chain.SharedConfig().Capabilities().Kafka2RaftMigration() + isOrdTx, isOrdConf, ordConf := chain.getOrdererConfig(config) + + if chain.ConsenterSupport.IsSystemChannel() && isOrdTx { + if migEnabled && chain.migrationManager.IsStartedOrCommitted() { + return errors.Errorf("consensus-type migration pending: ORDERER_TX on system channel, blocking channel creation") + } else { + return nil + } + } + + if !isOrdConf { + return nil + } + + if !migEnabled { + if chain.SharedConfig().ConsensusType() != ordConf.ConsensusType() { + return errors.Errorf("attempted to change consensus type from %s to %s", + chain.SharedConfig().ConsensusType(), ordConf.ConsensusType()) + } + if ordConf.ConsensusMigrationState() != ab.ConsensusType_MIG_STATE_NONE || ordConf.ConsensusMigrationContext() != 0 { + return errors.Errorf("new config has unexpected consensus-migration state or context: (%s/%d) should be (MIG_STATE_NONE/0)", + ordConf.ConsensusMigrationState(), ordConf.ConsensusMigrationContext()) + } + + return nil + } + + currentInfo := &migration.ConsensusTypeInfo{ + Type: chain.SharedConfig().ConsensusType(), + State: chain.SharedConfig().ConsensusMigrationState(), + Context: chain.SharedConfig().ConsensusMigrationContext(), + } + + nextInfo := &migration.ConsensusTypeInfo{ + Type: ordConf.ConsensusType(), + Metadata: ordConf.ConsensusMetadata(), + State: ordConf.ConsensusMigrationState(), + Context: ordConf.ConsensusMigrationContext(), + } + + if valErr := chain.migrationManager.Validate(currentInfo, nextInfo); valErr != nil { + return valErr + } + + return chain.migrationManager.CheckAllowed(nextInfo, chain.consenter.migrationController()) +} + // enqueue accepts a message and returns true on acceptance, or false otherwise. func (chain *chainImpl) enqueue(kafkaMsg *ab.KafkaMessage) bool { logger.Debugf("[channel: %s] Enqueueing envelope...", chain.ChainID()) @@ -1304,27 +1344,55 @@ func getHealthyClusterReplicaInfo(retryOptions localconfig.Retry, haltChan chan return replicaIDs, getReplicaInfo.retry() } -// isOrdererTx detects if the config envelope is holding an ORDERER_TX. -// This is only called during consensus-type migration, so the extra work -// (unmarshaling the envelope again) is not that important. -func isOrdererTx(env *cb.Envelope) (bool, error) { - payload, err := protoutil.UnmarshalPayload(env.Payload) +// getOrdererConfig parses the incoming config envelope, classifies it, and returns the OrdererConfig if it exists. +// +// isOrdererTx is true if the config envelope is holding an ORDERER_TRANSACTION. +// isOrdConf is true if the config envelope is holding a CONFIG, and the OrdererConfig exists. +// ordConf carries the OrdererConfig, if it exists. +func (chain *chainImpl) getOrdererConfig(configTx *cb.Envelope) ( + isOrdererTx bool, isOrdConf bool, ordConf channelconfig.Orderer) { + payload, err := protoutil.UnmarshalPayload(configTx.Payload) if err != nil { - return false, err + logger.Errorf("Consensus-type migration: Told to process a config tx, but configtx payload is invalid: %s", err) + return false, false, nil } if payload.Header == nil { - return false, errors.Errorf("Abort processing config msg because no head was set") + logger.Errorf("Consensus-type migration: Told to process a config tx, but configtx payload header is missing") + return false, false, nil } - if payload.Header.ChannelHeader == nil { - return false, errors.Errorf("Abort processing config msg because no channel header was set") + channelHeader, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader) + if err != nil { + logger.Errorf("Consensus-type migration: Told to process a config tx with an invalid channel header: %s", err) + return false, false, nil } - chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader) - if err != nil { - return false, errors.Errorf("Abort processing config msg because channel header unmarshalling error: %s", err) + logger.Debugf("[channel: %s] Consensus-type migration: Processing, header: %s", chain.ChainID(), channelHeader.String()) + + switch channelHeader.Type { + + case int32(cb.HeaderType_ORDERER_TRANSACTION): + isOrdererTx = true + + case int32(cb.HeaderType_CONFIG): + configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data) + if err != nil { + logger.Errorf("Consensus-type migration: Cannot unmarshal config envelope form payload data: %s", err) + return false, false, nil + } + config := configEnvelope.Config + bundle, err := channelconfig.NewBundle(chain.ChainID(), config) + if err != nil { + logger.Errorf("Consensus-type migration: Cannot create new bundle from Config: %s", err) + return false, false, nil + } + + ordConf, isOrdConf = bundle.OrdererConfig() + if !isOrdConf { + logger.Debugf("[channel: %s] Consensus-type migration: Config tx does not have OrdererConfig, ignoring", chain.ChainID()) + } } - return chdr.Type == int32(cb.HeaderType_ORDERER_TRANSACTION), nil + return isOrdererTx, isOrdConf, ordConf } diff --git a/orderer/consensus/kafka/chain_test.go b/orderer/consensus/kafka/chain_test.go index f0eb48218b9..7ce663f8bb6 100644 --- a/orderer/consensus/kafka/chain_test.go +++ b/orderer/consensus/kafka/chain_test.go @@ -70,6 +70,10 @@ func TestChain(t *testing.T) { ChainIDVal: mockChannel.topic(), HeightVal: uint64(3), SharedConfigVal: &mockconfig.Orderer{KafkaBrokersVal: []string{mockBroker.Addr()}}, + ChannelConfigVal: &mockconfig.Channel{ + CapabilitiesVal: &mockconfig.ChannelCapabilities{ + ConsensusTypeMigrationVal: false}, + }, } return } @@ -3036,6 +3040,10 @@ func TestResubmission(t *testing.T) { ResubmissionVal: true, }, }, + ChannelConfigVal: &mockconfig.Channel{ + CapabilitiesVal: &mockconfig.ChannelCapabilities{ + ConsensusTypeMigrationVal: false}, + }, SequenceVal: uint64(2), ProcessConfigMsgVal: newMockConfigEnvelope(), } @@ -3193,6 +3201,10 @@ func TestResubmission(t *testing.T) { ResubmissionVal: true, }, }, + ChannelConfigVal: &mockconfig.Channel{ + CapabilitiesVal: &mockconfig.ChannelCapabilities{ + ConsensusTypeMigrationVal: false}, + }, SequenceVal: uint64(1), ConfigSeqVal: uint64(1), ProcessConfigMsgVal: newMockConfigEnvelope(), @@ -3440,7 +3452,7 @@ func TestDeliverSession(t *testing.T) { defer env.broker2.Close() // initialize consenter - consenter, _ := New(mockLocalConfig, &mockkafka.MetricsProvider{}, &mockkafka.HealthChecker{}, &mockconsensus.FakeMigrationController{}) + consenter, _ := New(mockLocalConfig, &mockkafka.MetricsProvider{}, &mockkafka.HealthChecker{}, &mockconsensus.FakeController{}) // initialize chain metadata := &cb.Metadata{Value: protoutil.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: env.height})} @@ -3529,7 +3541,7 @@ func TestDeliverSession(t *testing.T) { defer env.broker0.Close() // initialize consenter - consenter, _ := New(mockLocalConfig, &mockkafka.MetricsProvider{}, &mockkafka.HealthChecker{}, &mockconsensus.FakeMigrationController{}) + consenter, _ := New(mockLocalConfig, &mockkafka.MetricsProvider{}, &mockkafka.HealthChecker{}, &mockconsensus.FakeController{}) // initialize chain metadata := &cb.Metadata{Value: protoutil.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: env.height})} @@ -3591,7 +3603,7 @@ func TestDeliverSession(t *testing.T) { defer env.broker0.Close() // initialize consenter - consenter, _ := New(mockLocalConfig, &mockkafka.MetricsProvider{}, &mockkafka.HealthChecker{}, &mockconsensus.FakeMigrationController{}) + consenter, _ := New(mockLocalConfig, &mockkafka.MetricsProvider{}, &mockkafka.HealthChecker{}, &mockconsensus.FakeController{}) // initialize chain metadata := &cb.Metadata{Value: protoutil.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: env.height})} @@ -3759,6 +3771,11 @@ func (c *mockConsenterSupport) SharedConfig() channelconfig.Orderer { return args.Get(0).(channelconfig.Orderer) } +func (c *mockConsenterSupport) ChannelConfig() channelconfig.Channel { + args := c.Called() + return args.Get(0).(channelconfig.Channel) +} + func (c *mockConsenterSupport) CreateNextBlock(messages []*cb.Envelope) *cb.Block { args := c.Called(messages) return args.Get(0).(*cb.Block) diff --git a/orderer/consensus/kafka/consenter_test.go b/orderer/consensus/kafka/consenter_test.go index 1eea28da77c..4dae1efe63d 100644 --- a/orderer/consensus/kafka/consenter_test.go +++ b/orderer/consensus/kafka/consenter_test.go @@ -72,12 +72,12 @@ func init() { } func TestNew(t *testing.T) { - c, _ := New(mockLocalConfig, &mock.MetricsProvider{}, &mock.HealthChecker{}, &mockconsensus.FakeMigrationController{}) + c, _ := New(mockLocalConfig, &mock.MetricsProvider{}, &mock.HealthChecker{}, &mockconsensus.FakeController{}) _ = consensus.Consenter(c) } func TestHandleChain(t *testing.T) { - consenter, _ := New(mockLocalConfig, &mock.MetricsProvider{}, &mock.HealthChecker{}, &mockconsensus.FakeMigrationController{}) + consenter, _ := New(mockLocalConfig, &mock.MetricsProvider{}, &mock.HealthChecker{}, &mockconsensus.FakeController{}) oldestOffset := int64(0) newestOffset := int64(5) @@ -112,7 +112,7 @@ func TestHandleChain(t *testing.T) { } func TestMigration(t *testing.T) { - consenter, _ := New(mockLocalConfig, &mock.MetricsProvider{}, &mock.HealthChecker{}, &mockconsensus.FakeMigrationController{}) + consenter, _ := New(mockLocalConfig, &mock.MetricsProvider{}, &mock.HealthChecker{}, &mockconsensus.FakeController{}) consenterimpl := consenter.(*consenterImpl) require.NotNil(t, consenterimpl.migrationController()) assert.NoError(t, consenterimpl.migrationController().ConsensusMigrationStart(111)) @@ -121,7 +121,7 @@ func TestMigration(t *testing.T) { mockLocalConfig.General.GenesisFile = "abc.genesis.block" mockLocalConfig.General.GenesisMethod = "file" - consenter, _ = New(mockLocalConfig, &mock.MetricsProvider{}, &mock.HealthChecker{}, &mockconsensus.FakeMigrationController{}) + consenter, _ = New(mockLocalConfig, &mock.MetricsProvider{}, &mock.HealthChecker{}, &mockconsensus.FakeController{}) consenterimpl = consenter.(*consenterImpl) assert.Equal(t, mockLocalConfig.General.GenesisFile, consenterimpl.bootstrapFile()) } diff --git a/orderer/consensus/migration/migration.go b/orderer/consensus/migration/migration.go index d06068f389c..9bd721e3106 100644 --- a/orderer/consensus/migration/migration.go +++ b/orderer/consensus/migration/migration.go @@ -31,6 +31,9 @@ type Status interface { // The definition of "committed" differs between the system and standard channels. // Returns true when: COMMIT on system channel; always false on standard channel. IsCommitted() bool + + // IsStartedOrCommitted returns true if consensus-type migration is started or committed on the underlying chain. + IsStartedOrCommitted() bool } // ConsensusTypeInfo carries the fields of protos/orderer/ConsensusType that are contained in a proposed or ordered @@ -147,6 +150,18 @@ func (sm *statusManager) SetStateContext(state orderer.ConsensusType_MigrationSt sm.context = context } +// IsStartedOrCommitted returns true if migration is started or committed. +func (sm *statusManager) IsStartedOrCommitted() bool { + sm.mutex.Lock() + defer sm.mutex.Unlock() + + if sm.systemChannel { + return sm.state == orderer.ConsensusType_MIG_STATE_START || sm.state == orderer.ConsensusType_MIG_STATE_COMMIT + } + + return sm.state == orderer.ConsensusType_MIG_STATE_START || sm.state == orderer.ConsensusType_MIG_STATE_CONTEXT +} + // IsPending returns true if migration is pending. func (sm *statusManager) IsPending() bool { sm.mutex.Lock() @@ -378,6 +393,5 @@ func (sm *statusManager) VerifyRaftMetadata(metadataBytes []byte) error { // Validate checks the validity of the state transitions of a possible migration config update tx by comparing the // current ConsensusTypeInfo config with the next (proposed) ConsensusTypeInfo. func (sm *statusManager) Validate(current, next *ConsensusTypeInfo) error { - //TODO - return nil + return Validate(sm.systemChannel, current, next) } diff --git a/orderer/consensus/migration/migration_test.go b/orderer/consensus/migration/migration_test.go index 6526c4e2e0b..2f3fa0bd046 100644 --- a/orderer/consensus/migration/migration_test.go +++ b/orderer/consensus/migration/migration_test.go @@ -7,9 +7,12 @@ import ( "fmt" "testing" + "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/orderer/consensus/migration" "github.com/hyperledger/fabric/orderer/consensus/mocks" "github.com/hyperledger/fabric/protos/orderer" + protosraft "github.com/hyperledger/fabric/protos/orderer/etcdraft" + "github.com/hyperledger/fabric/protoutil" "github.com/stretchr/testify/assert" ) @@ -90,7 +93,7 @@ func TestStateContextStandard(t *testing.T) { func TestStepSysFromNone(t *testing.T) { sysChan := true - migController := mocks.FakeMigrationController{} + migController := mocks.FakeController{} status := migration.NewManager(sysChan, "test") t.Run("None-None", func(t *testing.T) { @@ -144,7 +147,7 @@ func TestStepSysFromNone(t *testing.T) { func TestStepSysFromStart(t *testing.T) { sysChan := true - migController := mocks.FakeMigrationController{} + migController := mocks.FakeController{} status := migration.NewManager(sysChan, "test") lastBlockCut := uint64(6) context := lastBlockCut + 1 @@ -205,7 +208,7 @@ func TestStepSysFromStart(t *testing.T) { func TestStepSysFromCommit(t *testing.T) { sysChan := true - migController := mocks.FakeMigrationController{} + migController := mocks.FakeController{} status := migration.NewManager(sysChan, "test") lastBlockCut := uint64(6) context := lastBlockCut + 1 @@ -265,7 +268,7 @@ func TestStepSysFromAbort(t *testing.T) { func TestStepSysFromContext(t *testing.T) { sysChan := true - migController := mocks.FakeMigrationController{} + migController := mocks.FakeController{} status := migration.NewManager(sysChan, "test") lastBlockCut := uint64(6) context := lastBlockCut + 1 @@ -300,7 +303,7 @@ func TestStepSysFromContext(t *testing.T) { func TestStepStdFromNone(t *testing.T) { sysChan := false - migController := mocks.FakeMigrationController{} + migController := mocks.FakeController{} status := migration.NewManager(sysChan, "test") t.Run("None-None", func(t *testing.T) { @@ -342,7 +345,7 @@ func TestStepStdFromNone(t *testing.T) { func TestStepStdFromStart(t *testing.T) { sysChan := false - migController := mocks.FakeMigrationController{} + migController := mocks.FakeController{} status := migration.NewManager(sysChan, "test") lastBlockCut := uint64(6) context := lastBlockCut + 1 @@ -391,7 +394,7 @@ func TestStepStdFromStart(t *testing.T) { func TestStepStdFromContext(t *testing.T) { sysChan := false - migController := mocks.FakeMigrationController{} + migController := mocks.FakeController{} status := migration.NewManager(sysChan, "test") lastBlockCut := uint64(6) context := lastBlockCut + 1 @@ -433,7 +436,7 @@ func TestStepStdFromContext(t *testing.T) { func TestStepStdFromCommit(t *testing.T) { sysChan := false - migController := mocks.FakeMigrationController{} + migController := mocks.FakeController{} status := migration.NewManager(sysChan, "test") lastBlockCut := uint64(6) context := lastBlockCut + 1 @@ -454,3 +457,35 @@ func TestStepStdFromCommit(t *testing.T) { } }) } + +func prepareRaftMetadata() *protosraft.Metadata { + raftMetadata := &protosraft.Metadata{ + Consenters: []*protosraft.Consenter{ + { + ClientTlsCert: []byte{1, 2, 3, 4}, + ServerTlsCert: []byte{5, 6, 7, 8}, + Host: "127.0.0.1", + Port: uint32(31001), + }, + }, + Options: &protosraft.Options{ + TickInterval: "500ms", + ElectionTick: 10, + HeartbeatTick: 1, + MaxInflightMsgs: 256, + MaxSizePerMsg: 1048576, + SnapshotInterval: 8388608, + }} + + return raftMetadata +} + +func prepareRaftMetadataBytes(t *testing.T) []byte { + raftMetadata := prepareRaftMetadata() + raftMetadataBytes := protoutil.MarshalOrPanic(raftMetadata) + raftMetadata2 := &protosraft.Metadata{} + errUnma := proto.Unmarshal(raftMetadataBytes, raftMetadata2) + assert.NoError(t, errUnma) + + return raftMetadataBytes +} diff --git a/orderer/consensus/migration/validation.go b/orderer/consensus/migration/validation.go new file mode 100644 index 00000000000..f3f309982ee --- /dev/null +++ b/orderer/consensus/migration/validation.go @@ -0,0 +1,176 @@ +// Copyright IBM Corp. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package migration + +import ( + "github.com/hyperledger/fabric/protos/orderer" + "github.com/pkg/errors" +) + +// Validate checks the validity of the state transitions of a possible migration config update tx by comparing the +// current ConsensusTypeInfo config with the next (proposed) ConsensusTypeInfo. It is called during the broadcast +// phase and never changes the status of the underlying chain. +func Validate(systemChannel bool, current, next *ConsensusTypeInfo) error { + if systemChannel { + return validateSystem(current, next) + } + + return validateStandard(current, next) +} + +func validateSystem(current, next *ConsensusTypeInfo) error { + // Check validity of new state, type, and context + unExpCtx := "Consensus-type migration, state=%s, unexpected context, actual=%d (expected=%s)" + unExpType := "Consensus-type migration, state=%s, unexpected type, actual=%s (expected=%s)" + switch next.State { + case orderer.ConsensusType_MIG_STATE_NONE: + if next.Context != 0 { + return errors.Errorf(unExpCtx, next.State, next.Context, "0") + } + case orderer.ConsensusType_MIG_STATE_START: + if next.Type != "kafka" { + return errors.Errorf(unExpType, next.State, next.Type, "kafka") + } + if next.Context != 0 { + return errors.Errorf(unExpCtx, next.State, next.Context, "0") + } + case orderer.ConsensusType_MIG_STATE_COMMIT: + if next.Type != "etcdraft" { + return errors.Errorf(unExpType, next.State, next.Type, "etcdraft") + } + if next.Context <= 0 { + return errors.Errorf(unExpCtx, next.State, next.Context, ">0") + } + case orderer.ConsensusType_MIG_STATE_ABORT: + if next.Type != "kafka" { + return errors.Errorf(unExpType, next.State, next.Type, "kafka") + } + if next.Context <= 0 { + return errors.Errorf(unExpCtx, next.State, next.Context, ">0") + } + default: + return errors.Errorf("Consensus-type migration, state=%s, not permitted on system channel", next.State) + } + + // The following code explicitly checks for permitted transitions; all other transitions return an error. + if current.Type != next.Type { + if current.Type == "kafka" && next.Type == "etcdraft" { + // On the system channels, this is permitted, green path commit + isSysCommit := (current.State == orderer.ConsensusType_MIG_STATE_START) && (next.State == orderer.ConsensusType_MIG_STATE_COMMIT) + if !isSysCommit { + return errors.Errorf("Attempted to change consensus type from %s to %s, unexpected state transition: %s to %s", + current.Type, next.Type, current.State, next.State) + } + } else if current.Type == "etcdraft" && next.Type == "kafka" { + return errors.Errorf("Attempted to change consensus type from %s to %s, not permitted on system channel", current.Type, next.Type) + } else { + return errors.Errorf("Attempted to change consensus type from %s to %s, not supported", current.Type, next.Type) + } + } else { + // This is always permitted, not a migration + isNotMig := (current.State == orderer.ConsensusType_MIG_STATE_NONE) && (next.State == current.State) + if isNotMig { + return nil + } + + // Migration state may change when the type stays the same + if current.Type == "kafka" { + // In the "green" path: the system channel starts migration + isStart := (current.State == orderer.ConsensusType_MIG_STATE_NONE) && (next.State == orderer.ConsensusType_MIG_STATE_START) + // In the "abort" path: the system channel aborts a migration + isAbort := (current.State == orderer.ConsensusType_MIG_STATE_START) && (next.State == orderer.ConsensusType_MIG_STATE_ABORT) + // In the "abort" path: the system channel reconfigures after an abort, not a migration + isNotMigAfterAbort := (current.State == orderer.ConsensusType_MIG_STATE_ABORT) && (next.State == orderer.ConsensusType_MIG_STATE_NONE) + // In the "abort" path: the system channel starts a new migration attempt after an abort + isStartAfterAbort := (current.State == orderer.ConsensusType_MIG_STATE_ABORT) && (next.State == orderer.ConsensusType_MIG_STATE_START) + if !(isNotMig || isStart || isAbort || isNotMigAfterAbort || isStartAfterAbort) { + return errors.Errorf("Consensus type %s, unexpected migration state transition: %s to %s", + current.Type, current.State, next.State) + } + } else if current.Type == "etcdraft" { + // In the "green" path: the system channel reconfigures after a successful migration + isConfAfterSuccess := (current.State == orderer.ConsensusType_MIG_STATE_COMMIT) && (next.State == orderer.ConsensusType_MIG_STATE_NONE) + if !(isNotMig || isConfAfterSuccess) { + return errors.Errorf("Consensus type %s, unexpected migration state transition: %s to %s", + current.Type, current.State.String(), next.State) + } + } + } + + return nil +} + +func validateStandard(current, next *ConsensusTypeInfo) error { + // Check validity of new state, type, and context + unExpCtx := "Consensus-type migration, state=%s, unexpected context, actual=%d (expected=%s)" + switch next.State { + case orderer.ConsensusType_MIG_STATE_NONE: + if next.Context != 0 { + return errors.Errorf(unExpCtx, next.State, next.Context, "0") + } + case orderer.ConsensusType_MIG_STATE_CONTEXT: + if next.Type != "etcdraft" { + unExpType := "Consensus-type migration, state=%s, unexpected type, actual=%s (expected=%s)" + return errors.Errorf(unExpType, next.State, next.Type, "etcdraft") + } + if next.Context <= 0 { + return errors.Errorf(unExpCtx, next.State, next.Context, ">0") + } + case orderer.ConsensusType_MIG_STATE_ABORT: + if next.Type != "kafka" { + unExpType := "Consensus-type migration, state=%s, unexpected type, actual=%s (expected=%s)" + return errors.Errorf(unExpType, next.State, next.Type, "kafka") + } + if next.Context <= 0 { + return errors.Errorf(unExpCtx, next.State, next.Context, ">0") + } + default: + return errors.Errorf("Consensus-type migration, state=%s, not permitted on standard channel", next.State) + } + + // The following code explicitly checks for permitted transitions; all other transitions return an error. + if current.Type != next.Type { + badAttemptStr := "Attempted to change consensus type from %s to %s, unexpected state transition: %s to %s" + if current.Type == "kafka" && next.Type == "etcdraft" { + // On the standard channels, this is permitted, green path context + isCtx := (current.State == orderer.ConsensusType_MIG_STATE_NONE) && (next.State == orderer.ConsensusType_MIG_STATE_CONTEXT) + isCtxAfterAbort := (current.State == orderer.ConsensusType_MIG_STATE_ABORT) && (next.State == orderer.ConsensusType_MIG_STATE_CONTEXT) + if !(isCtx || isCtxAfterAbort) { + return errors.Errorf(badAttemptStr, current.Type, next.Type, current.State, next.State) + } + } else if current.Type == "etcdraft" && next.Type == "kafka" { + // On the standard channels, this is permitted, abort path + isAbort := (current.State == orderer.ConsensusType_MIG_STATE_CONTEXT) && (next.State == orderer.ConsensusType_MIG_STATE_ABORT) + if !isAbort { + return errors.Errorf(badAttemptStr, current.Type, next.Type, current.State, next.State) + } + } else { + return errors.Errorf("Attempted to change consensus type from %s to %s, not supported", current.Type, next.Type) + } + } else { + // This is always permitted, not a migration + isNotMig := (current.State == orderer.ConsensusType_MIG_STATE_NONE) && (next.State == current.State) + if isNotMig { + return nil + } + + // Migration state may change when the type stays the same + if current.Type == "etcdraft" { + // In the "green" path: a channel reconfigures after a successful migration + isConfigAfterSuccess := (current.State == orderer.ConsensusType_MIG_STATE_CONTEXT) && (next.State == orderer.ConsensusType_MIG_STATE_NONE) + if !isConfigAfterSuccess { + return errors.Errorf("Consensus type %s, unexpected migration state transition: %s to %s", current.Type, current.State, next.State) + } + } else if current.Type == "kafka" { + isAbort := (current.State == orderer.ConsensusType_MIG_STATE_NONE) && (next.State == orderer.ConsensusType_MIG_STATE_ABORT) + isConfigAfterAbort := (current.State == orderer.ConsensusType_MIG_STATE_ABORT) && (next.State == orderer.ConsensusType_MIG_STATE_NONE) + // Not a migration + if !(isAbort || isConfigAfterAbort) { + return errors.Errorf("Consensus type %s, unexpected migration state transition: %s to %s", current.Type, current.State, next.State) + } + } + } + + return nil +} diff --git a/orderer/consensus/migration/validation_test.go b/orderer/consensus/migration/validation_test.go new file mode 100644 index 00000000000..bef1a3d3c2c --- /dev/null +++ b/orderer/consensus/migration/validation_test.go @@ -0,0 +1,450 @@ +// Copyright IBM Corp. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package migration_test + +import ( + "testing" + + "github.com/hyperledger/fabric/orderer/consensus/migration" + "github.com/hyperledger/fabric/protos/orderer" + "github.com/stretchr/testify/assert" +) + +func TestSystemNormal(t *testing.T) { + kafkaMetadata := []byte{} + raftMetadata := prepareRaftMetadataBytes(t) + + t.Run("Green Path on System Channel", func(t *testing.T) { + curr := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + next := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + + err := migration.Validate(true, curr, next) + assert.NoError(t, err, "None to None") + + next.State = orderer.ConsensusType_MIG_STATE_START + err = migration.Validate(true, curr, next) + assert.NoError(t, err, "None to Start") + + curr = next + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_COMMIT, 7) + err = migration.Validate(true, curr, next) + assert.NoError(t, err, "Start to Commit") + + curr = next + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + err = migration.Validate(true, curr, next) + assert.NoError(t, err, "Commit to None") + + curr = next + err = migration.Validate(true, curr, next) + assert.NoError(t, err, "None to None") + }) + + t.Run("Abort Path on System Channel", func(t *testing.T) { + curr := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + next := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + + err := migration.Validate(true, curr, next) + assert.NoError(t, err, "None to None") + + next.State = orderer.ConsensusType_MIG_STATE_START + err = migration.Validate(true, curr, next) + assert.NoError(t, err, "None to Start") + + curr = next + next = createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_ABORT, 7) + err = migration.Validate(true, curr, next) + assert.NoError(t, err, "Start to Abort") + + curr = next + next = createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_START, 0) + err = migration.Validate(true, curr, next) + assert.NoError(t, err, "Abort to Start") + + curr = next + next = createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_ABORT, 7) + err = migration.Validate(true, curr, next) + assert.NoError(t, err, "Start to Abort") + + curr = next + next = createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + err = migration.Validate(true, curr, next) + assert.NoError(t, err, "Abort to None") + }) +} + +func TestStandardNormal(t *testing.T) { + kafkaMetadata := []byte{} + raftMetadata := prepareRaftMetadataBytes(t) + + t.Run("Green Path on Standard Channel", func(t *testing.T) { + curr := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + next := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + + err := migration.Validate(false, curr, next) + assert.NoError(t, err, "None to None") + + curr = next + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_CONTEXT, 7) + err = migration.Validate(false, curr, next) + assert.NoError(t, err, "None to Context") + + curr = next + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + err = migration.Validate(false, curr, next) + assert.NoError(t, err, "Context to None") + + curr = next + err = migration.Validate(false, curr, next) + assert.NoError(t, err, "None to None") + }) + + t.Run("Abort Path on Standard Channel", func(t *testing.T) { + curr := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + next := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + + err := migration.Validate(false, curr, next) + assert.NoError(t, err, "None to None") + + curr = next + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_CONTEXT, 7) + err = migration.Validate(false, curr, next) + assert.NoError(t, err, "None to Context") + + curr = next + next = createInfo("kafka", raftMetadata, orderer.ConsensusType_MIG_STATE_ABORT, 7) + err = migration.Validate(false, curr, next) + assert.NoError(t, err, "Context to Abort") + + curr = next + next = createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + err = migration.Validate(false, curr, next) + assert.NoError(t, err, "Abort to None") + + curr = next + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_CONTEXT, 7) + err = migration.Validate(false, curr, next) + assert.NoError(t, err, "None to Context") + + curr = next + next = createInfo("kafka", raftMetadata, orderer.ConsensusType_MIG_STATE_ABORT, 7) + err = migration.Validate(false, curr, next) + assert.NoError(t, err, "Context to Abort") + + curr = next + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_CONTEXT, 7) + err = migration.Validate(false, curr, next) + assert.NoError(t, err, "Abort to Context") + }) +} + +func TestSystemBadTrans(t *testing.T) { + kafkaMetadata := []byte{} + raftMetadata := prepareRaftMetadataBytes(t) + + t.Run("Bad transitions on System Channel, from NONE", func(t *testing.T) { + curr := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + + next := createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_COMMIT, 7) + err := migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Attempted to change consensus type from kafka to etcdraft, unexpected state transition: MIG_STATE_NONE to MIG_STATE_COMMIT") + + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_CONTEXT, 7) + err = migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_CONTEXT, not permitted on system channel") + }) + + t.Run("Bad transitions on System Channel, from START", func(t *testing.T) { + curr := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_START, 0) + + next := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + err := migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Consensus type kafka, unexpected migration state transition: MIG_STATE_START to MIG_STATE_NONE") + + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + err = migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Attempted to change consensus type from kafka to etcdraft, unexpected state transition: MIG_STATE_START to MIG_STATE_NONE") + + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_CONTEXT, 7) + err = migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_CONTEXT, not permitted on system channel") + + next = createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_START, 0) + err = migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Consensus type kafka, unexpected migration state transition: MIG_STATE_START to MIG_STATE_START") + }) + + t.Run("Bad transitions on System Channel, from COMMIT", func(t *testing.T) { + curr := createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_COMMIT, 7) + + next := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + err := migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Attempted to change consensus type from etcdraft to kafka, not permitted on system channel") + + next = createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_START, 0) + err = migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Attempted to change consensus type from etcdraft to kafka, not permitted on system channel") + + next = createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_ABORT, 7) + err = migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Attempted to change consensus type from etcdraft to kafka, not permitted on system channel") + + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_COMMIT, 8) + err = migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Consensus type etcdraft, unexpected migration state transition: MIG_STATE_COMMIT to MIG_STATE_COMMIT") + }) + + t.Run("Bad transitions on System Channel, from ABORT", func(t *testing.T) { + curr := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_ABORT, 7) + + next := createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_COMMIT, 7) + err := migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Attempted to change consensus type from kafka to etcdraft, unexpected state transition: MIG_STATE_ABORT to MIG_STATE_COMMIT") + + next = createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_ABORT, 8) + err = migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Consensus type kafka, unexpected migration state transition: MIG_STATE_ABORT to MIG_STATE_ABORT") + + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_CONTEXT, 8) + err = migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_CONTEXT, not permitted on system channel") + }) +} + +func TestStandardBadTrans(t *testing.T) { + kafkaMetadata := []byte{} + raftMetadata := prepareRaftMetadataBytes(t) + + t.Run("Bad transitions on Standard Channel, from NONE", func(t *testing.T) { + curr := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + + next := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_START, 0) + err := migration.Validate(false, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_START, not permitted on standard channel") + + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_COMMIT, 7) + err = migration.Validate(false, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_COMMIT, not permitted on standard channel") + }) + + t.Run("Bad transitions on Standard Channel, from CONTEXT", func(t *testing.T) { + curr := createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_CONTEXT, 7) + + next := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_START, 0) + err := migration.Validate(false, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_START, not permitted on standard channel") + + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_COMMIT, 7) + err = migration.Validate(false, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_COMMIT, not permitted on standard channel") + + next = createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + err = migration.Validate(false, curr, next) + assert.EqualError(t, err, + "Attempted to change consensus type from etcdraft to kafka, unexpected state transition: MIG_STATE_CONTEXT to MIG_STATE_NONE") + + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_CONTEXT, 7) + err = migration.Validate(false, curr, next) + assert.EqualError(t, err, + "Consensus type etcdraft, unexpected migration state transition: MIG_STATE_CONTEXT to MIG_STATE_CONTEXT") + }) + + t.Run("Bad transitions on Standard Channel, from ABORT", func(t *testing.T) { + curr := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_ABORT, 7) + + next := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_START, 0) + err := migration.Validate(false, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_START, not permitted on standard channel") + + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_COMMIT, 7) + err = migration.Validate(false, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_COMMIT, not permitted on standard channel") + }) +} + +func TestSystemBadNext(t *testing.T) { + kafkaMetadata := []byte{} + raftMetadata := prepareRaftMetadataBytes(t) + currLegal := []*migration.ConsensusTypeInfo{ + createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0), + createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_START, 0), + createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_ABORT, 7), + createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_COMMIT, 7), + createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0), + } + + t.Run("Bad next NONE", func(t *testing.T) { + next := createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 7) + for _, curr := range currLegal { + err := migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_NONE, unexpected context, actual=7 (expected=0)") + } + + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_NONE, 7) + for _, curr := range currLegal { + err := migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_NONE, unexpected context, actual=7 (expected=0)") + } + }) + + t.Run("Bad next START", func(t *testing.T) { + next := createInfo("kafka", raftMetadata, orderer.ConsensusType_MIG_STATE_START, 1) + for _, curr := range currLegal { + err := migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_START, unexpected context, actual=1 (expected=0)") + } + + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_START, 0) + for _, curr := range currLegal { + err := migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_START, unexpected type, actual=etcdraft (expected=kafka)") + } + }) + + t.Run("Bad next COMMIT", func(t *testing.T) { + next := createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_COMMIT, 0) + for _, curr := range currLegal { + err := migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_COMMIT, unexpected context, actual=0 (expected=>0)") + } + + next = createInfo("kafka", raftMetadata, orderer.ConsensusType_MIG_STATE_COMMIT, 7) + for _, curr := range currLegal { + err := migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_COMMIT, unexpected type, actual=kafka (expected=etcdraft)") + } + }) + + t.Run("Bad next ABORT", func(t *testing.T) { + next := createInfo("kafka", raftMetadata, orderer.ConsensusType_MIG_STATE_ABORT, 0) + for _, curr := range currLegal { + err := migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_ABORT, unexpected context, actual=0 (expected=>0)") + } + + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_ABORT, 7) + for _, curr := range currLegal { + err := migration.Validate(true, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_ABORT, unexpected type, actual=etcdraft (expected=kafka)") + } + }) + + t.Run("Bad next unsupported", func(t *testing.T) { + next := createInfo("solo", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + for _, curr := range currLegal { + err := migration.Validate(true, curr, next) + assert.Contains(t, err.Error(), "Attempted to change consensus type from") + assert.Contains(t, err.Error(), "to solo, not supported") + } + }) +} + +func TestStandardBadNext(t *testing.T) { + kafkaMetadata := []byte{} + raftMetadata := prepareRaftMetadataBytes(t) + currLegal := []*migration.ConsensusTypeInfo{ + createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0), + createInfo("kafka", kafkaMetadata, orderer.ConsensusType_MIG_STATE_ABORT, 7), + createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_CONTEXT, 7), + createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0), + } + + t.Run("Bad next NONE", func(t *testing.T) { + next := createInfo("kafka", raftMetadata, orderer.ConsensusType_MIG_STATE_NONE, 7) + for _, curr := range currLegal { + err := migration.Validate(false, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_NONE, unexpected context, actual=7 (expected=0)") + } + + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_NONE, 7) + for _, curr := range currLegal { + err := migration.Validate(false, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_NONE, unexpected context, actual=7 (expected=0)") + } + }) + + t.Run("Bad next ABORT", func(t *testing.T) { + next := createInfo("kafka", raftMetadata, orderer.ConsensusType_MIG_STATE_ABORT, 0) + for _, curr := range currLegal { + err := migration.Validate(false, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_ABORT, unexpected context, actual=0 (expected=>0)") + } + + next = createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_ABORT, 7) + for _, curr := range currLegal { + err := migration.Validate(false, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_ABORT, unexpected type, actual=etcdraft (expected=kafka)") + } + }) + + t.Run("Bad next CONTEXT", func(t *testing.T) { + next := createInfo("etcdraft", raftMetadata, orderer.ConsensusType_MIG_STATE_CONTEXT, 0) + for _, curr := range currLegal { + err := migration.Validate(false, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_CONTEXT, unexpected context, actual=0 (expected=>0)") + } + + next = createInfo("kafka", raftMetadata, orderer.ConsensusType_MIG_STATE_CONTEXT, 7) + for _, curr := range currLegal { + err := migration.Validate(false, curr, next) + assert.EqualError(t, err, + "Consensus-type migration, state=MIG_STATE_CONTEXT, unexpected type, actual=kafka (expected=etcdraft)") + } + }) + + t.Run("Bad next unsupported", func(t *testing.T) { + next := createInfo("solo", kafkaMetadata, orderer.ConsensusType_MIG_STATE_NONE, 0) + for _, curr := range currLegal { + err := migration.Validate(false, curr, next) + assert.Contains(t, err.Error(), "Attempted to change consensus type from") + assert.Contains(t, err.Error(), "to solo, not supported") + } + }) +} + +func createInfo( + consensusType string, + kafkaMetadata []byte, + state orderer.ConsensusType_MigrationState, + context uint64) *migration.ConsensusTypeInfo { + info := &migration.ConsensusTypeInfo{ + Type: consensusType, + Metadata: kafkaMetadata, + State: state, + Context: context, + } + return info +} diff --git a/orderer/consensus/mocks/consensus_migration_controller.go b/orderer/consensus/mocks/consensus_migration_controller.go index 2d97e576c4c..b55d04ac8a9 100644 --- a/orderer/consensus/mocks/consensus_migration_controller.go +++ b/orderer/consensus/mocks/consensus_migration_controller.go @@ -7,7 +7,7 @@ import ( migration "github.com/hyperledger/fabric/orderer/consensus/migration" ) -type FakeMigrationController struct { +type FakeController struct { ConsensusMigrationAbortStub func() error consensusMigrationAbortMutex sync.RWMutex consensusMigrationAbortArgsForCall []struct { @@ -53,7 +53,7 @@ type FakeMigrationController struct { invocationsMutex sync.RWMutex } -func (fake *FakeMigrationController) ConsensusMigrationAbort() error { +func (fake *FakeController) ConsensusMigrationAbort() error { fake.consensusMigrationAbortMutex.Lock() ret, specificReturn := fake.consensusMigrationAbortReturnsOnCall[len(fake.consensusMigrationAbortArgsForCall)] fake.consensusMigrationAbortArgsForCall = append(fake.consensusMigrationAbortArgsForCall, struct { @@ -70,19 +70,19 @@ func (fake *FakeMigrationController) ConsensusMigrationAbort() error { return fakeReturns.result1 } -func (fake *FakeMigrationController) ConsensusMigrationAbortCallCount() int { +func (fake *FakeController) ConsensusMigrationAbortCallCount() int { fake.consensusMigrationAbortMutex.RLock() defer fake.consensusMigrationAbortMutex.RUnlock() return len(fake.consensusMigrationAbortArgsForCall) } -func (fake *FakeMigrationController) ConsensusMigrationAbortCalls(stub func() error) { +func (fake *FakeController) ConsensusMigrationAbortCalls(stub func() error) { fake.consensusMigrationAbortMutex.Lock() defer fake.consensusMigrationAbortMutex.Unlock() fake.ConsensusMigrationAbortStub = stub } -func (fake *FakeMigrationController) ConsensusMigrationAbortReturns(result1 error) { +func (fake *FakeController) ConsensusMigrationAbortReturns(result1 error) { fake.consensusMigrationAbortMutex.Lock() defer fake.consensusMigrationAbortMutex.Unlock() fake.ConsensusMigrationAbortStub = nil @@ -91,7 +91,7 @@ func (fake *FakeMigrationController) ConsensusMigrationAbortReturns(result1 erro }{result1} } -func (fake *FakeMigrationController) ConsensusMigrationAbortReturnsOnCall(i int, result1 error) { +func (fake *FakeController) ConsensusMigrationAbortReturnsOnCall(i int, result1 error) { fake.consensusMigrationAbortMutex.Lock() defer fake.consensusMigrationAbortMutex.Unlock() fake.ConsensusMigrationAbortStub = nil @@ -105,7 +105,7 @@ func (fake *FakeMigrationController) ConsensusMigrationAbortReturnsOnCall(i int, }{result1} } -func (fake *FakeMigrationController) ConsensusMigrationCommit() error { +func (fake *FakeController) ConsensusMigrationCommit() error { fake.consensusMigrationCommitMutex.Lock() ret, specificReturn := fake.consensusMigrationCommitReturnsOnCall[len(fake.consensusMigrationCommitArgsForCall)] fake.consensusMigrationCommitArgsForCall = append(fake.consensusMigrationCommitArgsForCall, struct { @@ -122,19 +122,19 @@ func (fake *FakeMigrationController) ConsensusMigrationCommit() error { return fakeReturns.result1 } -func (fake *FakeMigrationController) ConsensusMigrationCommitCallCount() int { +func (fake *FakeController) ConsensusMigrationCommitCallCount() int { fake.consensusMigrationCommitMutex.RLock() defer fake.consensusMigrationCommitMutex.RUnlock() return len(fake.consensusMigrationCommitArgsForCall) } -func (fake *FakeMigrationController) ConsensusMigrationCommitCalls(stub func() error) { +func (fake *FakeController) ConsensusMigrationCommitCalls(stub func() error) { fake.consensusMigrationCommitMutex.Lock() defer fake.consensusMigrationCommitMutex.Unlock() fake.ConsensusMigrationCommitStub = stub } -func (fake *FakeMigrationController) ConsensusMigrationCommitReturns(result1 error) { +func (fake *FakeController) ConsensusMigrationCommitReturns(result1 error) { fake.consensusMigrationCommitMutex.Lock() defer fake.consensusMigrationCommitMutex.Unlock() fake.ConsensusMigrationCommitStub = nil @@ -143,7 +143,7 @@ func (fake *FakeMigrationController) ConsensusMigrationCommitReturns(result1 err }{result1} } -func (fake *FakeMigrationController) ConsensusMigrationCommitReturnsOnCall(i int, result1 error) { +func (fake *FakeController) ConsensusMigrationCommitReturnsOnCall(i int, result1 error) { fake.consensusMigrationCommitMutex.Lock() defer fake.consensusMigrationCommitMutex.Unlock() fake.ConsensusMigrationCommitStub = nil @@ -157,7 +157,7 @@ func (fake *FakeMigrationController) ConsensusMigrationCommitReturnsOnCall(i int }{result1} } -func (fake *FakeMigrationController) ConsensusMigrationPending() bool { +func (fake *FakeController) ConsensusMigrationPending() bool { fake.consensusMigrationPendingMutex.Lock() ret, specificReturn := fake.consensusMigrationPendingReturnsOnCall[len(fake.consensusMigrationPendingArgsForCall)] fake.consensusMigrationPendingArgsForCall = append(fake.consensusMigrationPendingArgsForCall, struct { @@ -174,19 +174,19 @@ func (fake *FakeMigrationController) ConsensusMigrationPending() bool { return fakeReturns.result1 } -func (fake *FakeMigrationController) ConsensusMigrationPendingCallCount() int { +func (fake *FakeController) ConsensusMigrationPendingCallCount() int { fake.consensusMigrationPendingMutex.RLock() defer fake.consensusMigrationPendingMutex.RUnlock() return len(fake.consensusMigrationPendingArgsForCall) } -func (fake *FakeMigrationController) ConsensusMigrationPendingCalls(stub func() bool) { +func (fake *FakeController) ConsensusMigrationPendingCalls(stub func() bool) { fake.consensusMigrationPendingMutex.Lock() defer fake.consensusMigrationPendingMutex.Unlock() fake.ConsensusMigrationPendingStub = stub } -func (fake *FakeMigrationController) ConsensusMigrationPendingReturns(result1 bool) { +func (fake *FakeController) ConsensusMigrationPendingReturns(result1 bool) { fake.consensusMigrationPendingMutex.Lock() defer fake.consensusMigrationPendingMutex.Unlock() fake.ConsensusMigrationPendingStub = nil @@ -195,7 +195,7 @@ func (fake *FakeMigrationController) ConsensusMigrationPendingReturns(result1 bo }{result1} } -func (fake *FakeMigrationController) ConsensusMigrationPendingReturnsOnCall(i int, result1 bool) { +func (fake *FakeController) ConsensusMigrationPendingReturnsOnCall(i int, result1 bool) { fake.consensusMigrationPendingMutex.Lock() defer fake.consensusMigrationPendingMutex.Unlock() fake.ConsensusMigrationPendingStub = nil @@ -209,7 +209,7 @@ func (fake *FakeMigrationController) ConsensusMigrationPendingReturnsOnCall(i in }{result1} } -func (fake *FakeMigrationController) ConsensusMigrationStart(arg1 uint64) error { +func (fake *FakeController) ConsensusMigrationStart(arg1 uint64) error { fake.consensusMigrationStartMutex.Lock() ret, specificReturn := fake.consensusMigrationStartReturnsOnCall[len(fake.consensusMigrationStartArgsForCall)] fake.consensusMigrationStartArgsForCall = append(fake.consensusMigrationStartArgsForCall, struct { @@ -227,26 +227,26 @@ func (fake *FakeMigrationController) ConsensusMigrationStart(arg1 uint64) error return fakeReturns.result1 } -func (fake *FakeMigrationController) ConsensusMigrationStartCallCount() int { +func (fake *FakeController) ConsensusMigrationStartCallCount() int { fake.consensusMigrationStartMutex.RLock() defer fake.consensusMigrationStartMutex.RUnlock() return len(fake.consensusMigrationStartArgsForCall) } -func (fake *FakeMigrationController) ConsensusMigrationStartCalls(stub func(uint64) error) { +func (fake *FakeController) ConsensusMigrationStartCalls(stub func(uint64) error) { fake.consensusMigrationStartMutex.Lock() defer fake.consensusMigrationStartMutex.Unlock() fake.ConsensusMigrationStartStub = stub } -func (fake *FakeMigrationController) ConsensusMigrationStartArgsForCall(i int) uint64 { +func (fake *FakeController) ConsensusMigrationStartArgsForCall(i int) uint64 { fake.consensusMigrationStartMutex.RLock() defer fake.consensusMigrationStartMutex.RUnlock() argsForCall := fake.consensusMigrationStartArgsForCall[i] return argsForCall.arg1 } -func (fake *FakeMigrationController) ConsensusMigrationStartReturns(result1 error) { +func (fake *FakeController) ConsensusMigrationStartReturns(result1 error) { fake.consensusMigrationStartMutex.Lock() defer fake.consensusMigrationStartMutex.Unlock() fake.ConsensusMigrationStartStub = nil @@ -255,7 +255,7 @@ func (fake *FakeMigrationController) ConsensusMigrationStartReturns(result1 erro }{result1} } -func (fake *FakeMigrationController) ConsensusMigrationStartReturnsOnCall(i int, result1 error) { +func (fake *FakeController) ConsensusMigrationStartReturnsOnCall(i int, result1 error) { fake.consensusMigrationStartMutex.Lock() defer fake.consensusMigrationStartMutex.Unlock() fake.ConsensusMigrationStartStub = nil @@ -269,7 +269,7 @@ func (fake *FakeMigrationController) ConsensusMigrationStartReturnsOnCall(i int, }{result1} } -func (fake *FakeMigrationController) Invocations() map[string][][]interface{} { +func (fake *FakeController) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() fake.consensusMigrationAbortMutex.RLock() @@ -287,7 +287,7 @@ func (fake *FakeMigrationController) Invocations() map[string][][]interface{} { return copiedInvocations } -func (fake *FakeMigrationController) recordInvocation(key string, args []interface{}) { +func (fake *FakeController) recordInvocation(key string, args []interface{}) { fake.invocationsMutex.Lock() defer fake.invocationsMutex.Unlock() if fake.invocations == nil { @@ -299,4 +299,4 @@ func (fake *FakeMigrationController) recordInvocation(key string, args []interfa fake.invocations[key] = append(fake.invocations[key], args) } -var _ migration.Controller = new(FakeMigrationController) +var _ migration.Controller = new(FakeController) diff --git a/orderer/consensus/mocks/mock_consenter_support.go b/orderer/consensus/mocks/mock_consenter_support.go index 8a082a95c21..5ac51677e3d 100644 --- a/orderer/consensus/mocks/mock_consenter_support.go +++ b/orderer/consensus/mocks/mock_consenter_support.go @@ -55,6 +55,16 @@ type FakeConsenterSupport struct { chainIDReturnsOnCall map[int]struct { result1 string } + ChannelConfigStub func() channelconfig.Channel + channelConfigMutex sync.RWMutex + channelConfigArgsForCall []struct { + } + channelConfigReturns struct { + result1 channelconfig.Channel + } + channelConfigReturnsOnCall map[int]struct { + result1 channelconfig.Channel + } ClassifyMsgStub func(*common.ChannelHeader) msgprocessor.Classification classifyMsgMutex sync.RWMutex classifyMsgArgsForCall []struct { @@ -437,6 +447,58 @@ func (fake *FakeConsenterSupport) ChainIDReturnsOnCall(i int, result1 string) { }{result1} } +func (fake *FakeConsenterSupport) ChannelConfig() channelconfig.Channel { + fake.channelConfigMutex.Lock() + ret, specificReturn := fake.channelConfigReturnsOnCall[len(fake.channelConfigArgsForCall)] + fake.channelConfigArgsForCall = append(fake.channelConfigArgsForCall, struct { + }{}) + fake.recordInvocation("ChannelConfig", []interface{}{}) + fake.channelConfigMutex.Unlock() + if fake.ChannelConfigStub != nil { + return fake.ChannelConfigStub() + } + if specificReturn { + return ret.result1 + } + fakeReturns := fake.channelConfigReturns + return fakeReturns.result1 +} + +func (fake *FakeConsenterSupport) ChannelConfigCallCount() int { + fake.channelConfigMutex.RLock() + defer fake.channelConfigMutex.RUnlock() + return len(fake.channelConfigArgsForCall) +} + +func (fake *FakeConsenterSupport) ChannelConfigCalls(stub func() channelconfig.Channel) { + fake.channelConfigMutex.Lock() + defer fake.channelConfigMutex.Unlock() + fake.ChannelConfigStub = stub +} + +func (fake *FakeConsenterSupport) ChannelConfigReturns(result1 channelconfig.Channel) { + fake.channelConfigMutex.Lock() + defer fake.channelConfigMutex.Unlock() + fake.ChannelConfigStub = nil + fake.channelConfigReturns = struct { + result1 channelconfig.Channel + }{result1} +} + +func (fake *FakeConsenterSupport) ChannelConfigReturnsOnCall(i int, result1 channelconfig.Channel) { + fake.channelConfigMutex.Lock() + defer fake.channelConfigMutex.Unlock() + fake.ChannelConfigStub = nil + if fake.channelConfigReturnsOnCall == nil { + fake.channelConfigReturnsOnCall = make(map[int]struct { + result1 channelconfig.Channel + }) + } + fake.channelConfigReturnsOnCall[i] = struct { + result1 channelconfig.Channel + }{result1} +} + func (fake *FakeConsenterSupport) ClassifyMsg(arg1 *common.ChannelHeader) msgprocessor.Classification { fake.classifyMsgMutex.Lock() ret, specificReturn := fake.classifyMsgReturnsOnCall[len(fake.classifyMsgArgsForCall)] @@ -1239,6 +1301,8 @@ func (fake *FakeConsenterSupport) Invocations() map[string][][]interface{} { defer fake.blockCutterMutex.RUnlock() fake.chainIDMutex.RLock() defer fake.chainIDMutex.RUnlock() + fake.channelConfigMutex.RLock() + defer fake.channelConfigMutex.RUnlock() fake.classifyMsgMutex.RLock() defer fake.classifyMsgMutex.RUnlock() fake.createNextBlockMutex.RLock() diff --git a/orderer/mocks/common/multichannel/multichannel.go b/orderer/mocks/common/multichannel/multichannel.go index 0b8a3d155cd..48747a5f2c3 100644 --- a/orderer/mocks/common/multichannel/multichannel.go +++ b/orderer/mocks/common/multichannel/multichannel.go @@ -22,6 +22,9 @@ type ConsenterSupport struct { // SharedConfigVal is the value returned by SharedConfig() SharedConfigVal *mockconfig.Orderer + // SharedConfigVal is the value returned by ChannelConfig() + ChannelConfigVal *mockconfig.Channel + // BlockCutterVal is the value returned by BlockCutter() BlockCutterVal *mockblockcutter.Receiver @@ -85,6 +88,11 @@ func (mcs *ConsenterSupport) SharedConfig() channelconfig.Orderer { return mcs.SharedConfigVal } +// ChannelConfig returns ChannelConfigVal +func (mcs *ConsenterSupport) ChannelConfig() channelconfig.Channel { + return mcs.ChannelConfigVal +} + // CreateNextBlock creates a simple block structure with the given data func (mcs *ConsenterSupport) CreateNextBlock(data []*cb.Envelope) *cb.Block { block := protoutil.NewBlock(0, nil)