Skip to content

Commit

Permalink
FAB-14700 Kafka2Raft validate broadcast
Browse files Browse the repository at this point in the history
Add methods to validate a config update in broadcast phase.
Validate a proposed config update versus the current config.

Checks the validity of the state transitions of a possible
migration config update tx by comparing the current
ConsensusType config with the next (proposed)
ConsensusType fields. It is called during the broadcast phase
and never changes the status of the underlying chain.

Change-Id: Ib9b7d75f703d7f7f5eb0c3d514e2bfd4073f34f9
Signed-off-by: Yoav Tock <tock@il.ibm.com>
  • Loading branch information
tock-ibm committed Mar 25, 2019
1 parent a0a9189 commit 2d924d0
Show file tree
Hide file tree
Showing 11 changed files with 903 additions and 68 deletions.
3 changes: 3 additions & 0 deletions orderer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ type ConsenterSupport interface {
// SharedConfig provides the shared config from the channel's current config block.
SharedConfig() channelconfig.Orderer

// ChannelConfig provides the channel config from the channel's current config block.
ChannelConfig() channelconfig.Channel

// CreateNextBlock takes a list of messages and creates the next block based on the block with highest block number committed to the ledger
// Note that either WriteBlock or WriteConfigBlock must be called before invoking this method a second time.
CreateNextBlock(messages []*cb.Envelope) *cb.Block
Expand Down
120 changes: 94 additions & 26 deletions orderer/consensus/kafka/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,19 +248,8 @@ func (chain *chainImpl) Configure(config *cb.Envelope, configSeq uint64) error {
}

func (chain *chainImpl) configure(config *cb.Envelope, configSeq uint64, originalOffset int64) error {
// During consensus-type migration, stop channel creation
if chain.ConsenterSupport.IsSystemChannel() && chain.migrationManager.IsPending() {
ordererTx, err := isOrdererTx(config)
if err != nil {
err = errors.Wrap(err, "cannot determine if config-tx is of type ORDERER_TX, on system channel")
logger.Warning(err)
return err
}
if ordererTx {
str := "cannot enqueue, consensus-type migration pending: ORDERER_TX on system channel, blocking channel creation"
logger.Info(str)
return errors.Errorf(str)
}
if err := chain.filterMigration(config); err != nil {
return errors.Wrap(err, "cannot enqueue")
}

marshaledConfig, err := protoutil.Marshal(config)
Expand All @@ -273,6 +262,57 @@ func (chain *chainImpl) configure(config *cb.Envelope, configSeq uint64, origina
return nil
}

// filterMigration filters out config transactions that should be rejected during consensus-type migration.
// Filtering takes into account the global state of migration, not just the state of the current channel.
func (chain *chainImpl) filterMigration(config *cb.Envelope) error {
migEnabled := chain.ChannelConfig().Capabilities().ConsensusTypeMigration() && chain.SharedConfig().Capabilities().Kafka2RaftMigration()
isOrdTx, isOrdConf, ordConf := chain.getOrdererConfig(config)

if chain.ConsenterSupport.IsSystemChannel() && isOrdTx {
if migEnabled && chain.migrationManager.IsStartedOrCommitted() {
return errors.Errorf("consensus-type migration pending: ORDERER_TX on system channel, blocking channel creation")
} else {
return nil
}
}

if !isOrdConf {
return nil
}

if !migEnabled {
if chain.SharedConfig().ConsensusType() != ordConf.ConsensusType() {
return errors.Errorf("attempted to change consensus type from %s to %s",
chain.SharedConfig().ConsensusType(), ordConf.ConsensusType())
}
if ordConf.ConsensusMigrationState() != ab.ConsensusType_MIG_STATE_NONE || ordConf.ConsensusMigrationContext() != 0 {
return errors.Errorf("new config has unexpected consensus-migration state or context: (%s/%d) should be (MIG_STATE_NONE/0)",
ordConf.ConsensusMigrationState(), ordConf.ConsensusMigrationContext())
}

return nil
}

currentInfo := &migration.ConsensusTypeInfo{
Type: chain.SharedConfig().ConsensusType(),
State: chain.SharedConfig().ConsensusMigrationState(),
Context: chain.SharedConfig().ConsensusMigrationContext(),
}

nextInfo := &migration.ConsensusTypeInfo{
Type: ordConf.ConsensusType(),
Metadata: ordConf.ConsensusMetadata(),
State: ordConf.ConsensusMigrationState(),
Context: ordConf.ConsensusMigrationContext(),
}

if valErr := chain.migrationManager.Validate(currentInfo, nextInfo); valErr != nil {
return valErr
}

return chain.migrationManager.CheckAllowed(nextInfo, chain.consenter.migrationController())
}

// enqueue accepts a message and returns true on acceptance, or false otherwise.
func (chain *chainImpl) enqueue(kafkaMsg *ab.KafkaMessage) bool {
logger.Debugf("[channel: %s] Enqueueing envelope...", chain.ChainID())
Expand Down Expand Up @@ -1304,27 +1344,55 @@ func getHealthyClusterReplicaInfo(retryOptions localconfig.Retry, haltChan chan
return replicaIDs, getReplicaInfo.retry()
}

// isOrdererTx detects if the config envelope is holding an ORDERER_TX.
// This is only called during consensus-type migration, so the extra work
// (unmarshaling the envelope again) is not that important.
func isOrdererTx(env *cb.Envelope) (bool, error) {
payload, err := protoutil.UnmarshalPayload(env.Payload)
// getOrdererConfig parses the incoming config envelope, classifies it, and returns the OrdererConfig if it exists.
//
// isOrdererTx is true if the config envelope is holding an ORDERER_TRANSACTION.
// isOrdConf is true if the config envelope is holding a CONFIG, and the OrdererConfig exists.
// ordConf carries the OrdererConfig, if it exists.
func (chain *chainImpl) getOrdererConfig(configTx *cb.Envelope) (
isOrdererTx bool, isOrdConf bool, ordConf channelconfig.Orderer) {
payload, err := protoutil.UnmarshalPayload(configTx.Payload)
if err != nil {
return false, err
logger.Errorf("Consensus-type migration: Told to process a config tx, but configtx payload is invalid: %s", err)
return false, false, nil
}

if payload.Header == nil {
return false, errors.Errorf("Abort processing config msg because no head was set")
logger.Errorf("Consensus-type migration: Told to process a config tx, but configtx payload header is missing")
return false, false, nil
}

if payload.Header.ChannelHeader == nil {
return false, errors.Errorf("Abort processing config msg because no channel header was set")
channelHeader, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Errorf("Consensus-type migration: Told to process a config tx with an invalid channel header: %s", err)
return false, false, nil
}

chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return false, errors.Errorf("Abort processing config msg because channel header unmarshalling error: %s", err)
logger.Debugf("[channel: %s] Consensus-type migration: Processing, header: %s", chain.ChainID(), channelHeader.String())

switch channelHeader.Type {

case int32(cb.HeaderType_ORDERER_TRANSACTION):
isOrdererTx = true

case int32(cb.HeaderType_CONFIG):
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
if err != nil {
logger.Errorf("Consensus-type migration: Cannot unmarshal config envelope form payload data: %s", err)
return false, false, nil
}
config := configEnvelope.Config
bundle, err := channelconfig.NewBundle(chain.ChainID(), config)
if err != nil {
logger.Errorf("Consensus-type migration: Cannot create new bundle from Config: %s", err)
return false, false, nil
}

ordConf, isOrdConf = bundle.OrdererConfig()
if !isOrdConf {
logger.Debugf("[channel: %s] Consensus-type migration: Config tx does not have OrdererConfig, ignoring", chain.ChainID())
}
}

return chdr.Type == int32(cb.HeaderType_ORDERER_TRANSACTION), nil
return isOrdererTx, isOrdConf, ordConf
}
23 changes: 20 additions & 3 deletions orderer/consensus/kafka/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func TestChain(t *testing.T) {
ChainIDVal: mockChannel.topic(),
HeightVal: uint64(3),
SharedConfigVal: &mockconfig.Orderer{KafkaBrokersVal: []string{mockBroker.Addr()}},
ChannelConfigVal: &mockconfig.Channel{
CapabilitiesVal: &mockconfig.ChannelCapabilities{
ConsensusTypeMigrationVal: false},
},
}
return
}
Expand Down Expand Up @@ -3036,6 +3040,10 @@ func TestResubmission(t *testing.T) {
ResubmissionVal: true,
},
},
ChannelConfigVal: &mockconfig.Channel{
CapabilitiesVal: &mockconfig.ChannelCapabilities{
ConsensusTypeMigrationVal: false},
},
SequenceVal: uint64(2),
ProcessConfigMsgVal: newMockConfigEnvelope(),
}
Expand Down Expand Up @@ -3193,6 +3201,10 @@ func TestResubmission(t *testing.T) {
ResubmissionVal: true,
},
},
ChannelConfigVal: &mockconfig.Channel{
CapabilitiesVal: &mockconfig.ChannelCapabilities{
ConsensusTypeMigrationVal: false},
},
SequenceVal: uint64(1),
ConfigSeqVal: uint64(1),
ProcessConfigMsgVal: newMockConfigEnvelope(),
Expand Down Expand Up @@ -3440,7 +3452,7 @@ func TestDeliverSession(t *testing.T) {
defer env.broker2.Close()

// initialize consenter
consenter, _ := New(mockLocalConfig, &mockkafka.MetricsProvider{}, &mockkafka.HealthChecker{}, &mockconsensus.FakeMigrationController{})
consenter, _ := New(mockLocalConfig, &mockkafka.MetricsProvider{}, &mockkafka.HealthChecker{}, &mockconsensus.FakeController{})

// initialize chain
metadata := &cb.Metadata{Value: protoutil.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: env.height})}
Expand Down Expand Up @@ -3529,7 +3541,7 @@ func TestDeliverSession(t *testing.T) {
defer env.broker0.Close()

// initialize consenter
consenter, _ := New(mockLocalConfig, &mockkafka.MetricsProvider{}, &mockkafka.HealthChecker{}, &mockconsensus.FakeMigrationController{})
consenter, _ := New(mockLocalConfig, &mockkafka.MetricsProvider{}, &mockkafka.HealthChecker{}, &mockconsensus.FakeController{})

// initialize chain
metadata := &cb.Metadata{Value: protoutil.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: env.height})}
Expand Down Expand Up @@ -3591,7 +3603,7 @@ func TestDeliverSession(t *testing.T) {
defer env.broker0.Close()

// initialize consenter
consenter, _ := New(mockLocalConfig, &mockkafka.MetricsProvider{}, &mockkafka.HealthChecker{}, &mockconsensus.FakeMigrationController{})
consenter, _ := New(mockLocalConfig, &mockkafka.MetricsProvider{}, &mockkafka.HealthChecker{}, &mockconsensus.FakeController{})

// initialize chain
metadata := &cb.Metadata{Value: protoutil.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: env.height})}
Expand Down Expand Up @@ -3759,6 +3771,11 @@ func (c *mockConsenterSupport) SharedConfig() channelconfig.Orderer {
return args.Get(0).(channelconfig.Orderer)
}

func (c *mockConsenterSupport) ChannelConfig() channelconfig.Channel {
args := c.Called()
return args.Get(0).(channelconfig.Channel)
}

func (c *mockConsenterSupport) CreateNextBlock(messages []*cb.Envelope) *cb.Block {
args := c.Called(messages)
return args.Get(0).(*cb.Block)
Expand Down
8 changes: 4 additions & 4 deletions orderer/consensus/kafka/consenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ func init() {
}

func TestNew(t *testing.T) {
c, _ := New(mockLocalConfig, &mock.MetricsProvider{}, &mock.HealthChecker{}, &mockconsensus.FakeMigrationController{})
c, _ := New(mockLocalConfig, &mock.MetricsProvider{}, &mock.HealthChecker{}, &mockconsensus.FakeController{})
_ = consensus.Consenter(c)
}

func TestHandleChain(t *testing.T) {
consenter, _ := New(mockLocalConfig, &mock.MetricsProvider{}, &mock.HealthChecker{}, &mockconsensus.FakeMigrationController{})
consenter, _ := New(mockLocalConfig, &mock.MetricsProvider{}, &mock.HealthChecker{}, &mockconsensus.FakeController{})

oldestOffset := int64(0)
newestOffset := int64(5)
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestHandleChain(t *testing.T) {
}

func TestMigration(t *testing.T) {
consenter, _ := New(mockLocalConfig, &mock.MetricsProvider{}, &mock.HealthChecker{}, &mockconsensus.FakeMigrationController{})
consenter, _ := New(mockLocalConfig, &mock.MetricsProvider{}, &mock.HealthChecker{}, &mockconsensus.FakeController{})
consenterimpl := consenter.(*consenterImpl)
require.NotNil(t, consenterimpl.migrationController())
assert.NoError(t, consenterimpl.migrationController().ConsensusMigrationStart(111))
Expand All @@ -121,7 +121,7 @@ func TestMigration(t *testing.T) {

mockLocalConfig.General.GenesisFile = "abc.genesis.block"
mockLocalConfig.General.GenesisMethod = "file"
consenter, _ = New(mockLocalConfig, &mock.MetricsProvider{}, &mock.HealthChecker{}, &mockconsensus.FakeMigrationController{})
consenter, _ = New(mockLocalConfig, &mock.MetricsProvider{}, &mock.HealthChecker{}, &mockconsensus.FakeController{})
consenterimpl = consenter.(*consenterImpl)
assert.Equal(t, mockLocalConfig.General.GenesisFile, consenterimpl.bootstrapFile())
}
Expand Down
18 changes: 16 additions & 2 deletions orderer/consensus/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type Status interface {
// The definition of "committed" differs between the system and standard channels.
// Returns true when: COMMIT on system channel; always false on standard channel.
IsCommitted() bool

// IsStartedOrCommitted returns true if consensus-type migration is started or committed on the underlying chain.
IsStartedOrCommitted() bool
}

// ConsensusTypeInfo carries the fields of protos/orderer/ConsensusType that are contained in a proposed or ordered
Expand Down Expand Up @@ -147,6 +150,18 @@ func (sm *statusManager) SetStateContext(state orderer.ConsensusType_MigrationSt
sm.context = context
}

// IsStartedOrCommitted returns true if migration is started or committed.
func (sm *statusManager) IsStartedOrCommitted() bool {
sm.mutex.Lock()
defer sm.mutex.Unlock()

if sm.systemChannel {
return sm.state == orderer.ConsensusType_MIG_STATE_START || sm.state == orderer.ConsensusType_MIG_STATE_COMMIT
}

return sm.state == orderer.ConsensusType_MIG_STATE_START || sm.state == orderer.ConsensusType_MIG_STATE_CONTEXT
}

// IsPending returns true if migration is pending.
func (sm *statusManager) IsPending() bool {
sm.mutex.Lock()
Expand Down Expand Up @@ -378,6 +393,5 @@ func (sm *statusManager) VerifyRaftMetadata(metadataBytes []byte) error {
// Validate checks the validity of the state transitions of a possible migration config update tx by comparing the
// current ConsensusTypeInfo config with the next (proposed) ConsensusTypeInfo.
func (sm *statusManager) Validate(current, next *ConsensusTypeInfo) error {
//TODO
return nil
return Validate(sm.systemChannel, current, next)
}
Loading

0 comments on commit 2d924d0

Please sign in to comment.