Skip to content

Commit

Permalink
FAB-15044 Mig-v1 cleanup #3
Browse files Browse the repository at this point in the history
In preparation to the alternative migration design (v2),
revert some of the commits already merged.

Kafka to Raft migration v1 - cleanup #3
kafka2raft green path #3 (of v1)

Clean commit
4950edd

Change-Id: I52a1283e8b2041983f31071bb0a75c582b031a50
Signed-off-by: Yoav Tock <tock@il.ibm.com>
  • Loading branch information
tock-ibm committed Apr 7, 2019
1 parent 22ac9e6 commit 925db23
Show file tree
Hide file tree
Showing 16 changed files with 45 additions and 739 deletions.
64 changes: 0 additions & 64 deletions orderer/common/broadcast/mock/channel_support_registrar.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions orderer/common/multichannel/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ type ChainSupport struct {
consensus.Chain
cutter blockcutter.Receiver
identity.SignerSerializer
// Needed for consensus-type migration: to execute the migration state machine correctly,
// chains need to know if they are system or standard channel.
systemChannel bool
}

func newChainSupport(
Expand Down Expand Up @@ -59,9 +56,6 @@ func newChainSupport(
),
}

// When ConsortiumsConfig exists, it is the system channel
_, cs.systemChannel = ledgerResources.ConsortiumsConfig()

// Set up the msgprocessor
cs.Processor = msgprocessor.NewStandardChannel(cs, msgprocessor.CreateStandardChannelFilters(cs))

Expand Down Expand Up @@ -183,8 +177,3 @@ func (cs *ChainSupport) VerifyBlockSignature(sd []*protoutil.SignedData, envelop
}
return nil
}

// IsSystemChannel returns true if this is the system channel.
func (cs *ChainSupport) IsSystemChannel() bool {
return cs.systemChannel
}
70 changes: 0 additions & 70 deletions orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,6 @@ func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
r.consenters = consenters
existingChains := r.ledgerFactory.ChainIDs()

//TODO To initialize after consensus-type migration, it is necessary to identify the system channel and create it first,
// determining the correct consensus-type and the state of the migration. This is needed for recovery, in case the
// migration process crashes before it is committed.

for _, chainID := range existingChains {
rl, err := r.ledgerFactory.GetOrCreate(chainID)
if err != nil {
Expand Down Expand Up @@ -232,10 +228,6 @@ func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader
cs := r.GetChain(chdr.ChannelId)
// New channel creation
if cs == nil {
// Prevent channel creation during consensus-type migration
if r.ConsensusMigrationPending() {
return chdr, true, nil, errors.New("cannot create channel because consensus-type migration is pending")
}
cs = r.systemChannel
}

Expand All @@ -251,68 +243,6 @@ func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader
return chdr, isConfig, cs, nil
}

// ConsensusMigrationPending checks whether consensus-type migration is started on the system channel.
func (r *Registrar) ConsensusMigrationPending() bool {
//Note: systemChannel.MigrationStatus().IsPending() is thread safe, takes a mutex
return r.systemChannel.MigrationStatus().IsPending()
}

// ConsensusMigrationStart checks whether consensus-type migration had started,
// and then marks all standard channels as started.
func (r *Registrar) ConsensusMigrationStart(context uint64) error {
r.lock.Lock()
defer r.lock.Unlock()

for id, chain := range r.chains {
if id != r.systemChannel.ChainID() && chain.MigrationStatus().IsPending() {
return errors.Errorf("cannot start new consensus-type migration because standard channel %s, still pending", id)
}
}

for _, chain := range r.chains {
chain.MigrationStatus().SetStateContext(ab.ConsensusType_MIG_STATE_START, context)
}
logger.Debugf("Consensus-type migration: all standard channels marked as started, context=%d", context)

return nil
}

// ConsensusMigrationCommit checks pre-conditions and commits the consensus-type migration.
func (r *Registrar) ConsensusMigrationCommit() error {
r.lock.Lock()
defer r.lock.Unlock()

sysState, sysContext := r.systemChannel.MigrationStatus().StateContext()
if !(sysState == ab.ConsensusType_MIG_STATE_START && sysContext > 0) {
return errors.Errorf("cannot commit consensus-type migration because system channel (%s): state=%s, context=%d (expect: state=%s, context>0)",
r.systemChannel.ChainID(), sysState, sysContext, ab.ConsensusType_MIG_STATE_START)
}

for id, chain := range r.chains {
st, ctx := chain.MigrationStatus().StateContext()
if id == r.systemChannel.ChainID() {
continue
}
if st != ab.ConsensusType_MIG_STATE_CONTEXT {
return errors.Errorf("cannot commit consensus-type migration because standard channel %s, still pending, state=%s", id, st)
}
if ctx != sysContext {
return errors.Errorf("cannot commit consensus-type migration because standard channel %s, bad context=%d, expected=%d", id, ctx, sysContext)
}
}

r.systemChannel.MigrationStatus().SetStateContext(ab.ConsensusType_MIG_STATE_COMMIT, sysContext)
logger.Debugf("Consensus-type migration: system channel marked as committed, context=%d", sysContext)

return nil
}

// ConsensusMigrationAbort checks pre-conditions and aborts the consensus-type migration.
func (r *Registrar) ConsensusMigrationAbort() error {
//TODO implement the consensus-type migration abort path
return fmt.Errorf("Not implemented yet")
}

// GetChain retrieves the chain support for a chain if it exists.
func (r *Registrar) GetChain(chainID string) *ChainSupport {
r.lock.RLock()
Expand Down
156 changes: 1 addition & 155 deletions orderer/common/multichannel/registrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/ledger/blockledger"
ramledger "github.com/hyperledger/fabric/common/ledger/blockledger/ram"
"github.com/hyperledger/fabric/common/ledger/blockledger/ram"
"github.com/hyperledger/fabric/common/metrics/disabled"
mockchannelconfig "github.com/hyperledger/fabric/common/mocks/config"
mockpolicies "github.com/hyperledger/fabric/common/mocks/policies"
Expand All @@ -27,7 +27,6 @@ import (
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

//go:generate counterfeiter -o mocks/signer_serializer.go --fake-name SignerSerializer . signerSerializer
Expand Down Expand Up @@ -202,159 +201,6 @@ func TestNewRegistrar(t *testing.T) {
})
}

// This test essentially brings the entire system up and is ultimately what main.go will replicate,
// doing it on the system and two standard channels.
// Then, it is testing the methods that implement the MigrationController interface,
// used in consensus-type migration.
func Test3ChanConsensusMigrationController(t *testing.T) {
const (
testChainID1 = genesisconfig.TestChainID + "1"
testChainID2 = genesisconfig.TestChainID + "2"
)

var genesisBlockSys *cb.Block
var genesisBlockStd1 *cb.Block
var genesisBlockStd2 *cb.Block

//system channel
confSys := configtxgentest.Load(genesisconfig.SampleInsecureSoloProfile)
genesisBlockSys = encoder.New(confSys).GenesisBlock()
//standard channel, no Consortiums
confStd := configtxgentest.Load(genesisconfig.SampleInsecureSoloProfile)
confStd.Consortiums = nil
genesisBlockStd1 = encoder.New(confStd).GenesisBlockForChannel(testChainID1)
genesisBlockStd2 = encoder.New(confStd).GenesisBlockForChannel(testChainID2)

t.Run("Green path", func(t *testing.T) {
lf, rls := newRAMLedgerAndFactory3Chan(10,
genesisconfig.TestChainID, genesisBlockSys,
testChainID1, genesisBlockStd1,
testChainID2, genesisBlockStd2)

consenters := make(map[string]consensus.Consenter)
consenters[confSys.Orderer.OrdererType] = &mockConsenter{}

manager := NewRegistrar(lf, mockCrypto(), &disabled.Provider{})
manager.Initialize(consenters)

chainSupport := manager.GetChain("Fake")
assert.Nilf(t, chainSupport, "Should not have found a chain that was not created")

chainSupport = manager.GetChain(genesisconfig.TestChainID)
assert.NotNilf(t, chainSupport, "Should have gotten chain which was initialized by ramledger")
assert.True(t, chainSupport.IsSystemChannel())
assert.True(t, !chainSupport.MigrationStatus().IsPending())

testMessageOrderAndRetrieval(confSys.Orderer.BatchSize.MaxMessageCount, genesisconfig.TestChainID, chainSupport, rls[0], t)

//standard channel 1
chainSupport1 := manager.GetChain(testChainID1)
assert.NotNilf(t, chainSupport1, "Should have gotten chain which was initialized by ramledger")
assert.True(t, !chainSupport1.IsSystemChannel())
assert.True(t, !chainSupport1.MigrationStatus().IsPending())

testMessageOrderAndRetrieval(confSys.Orderer.BatchSize.MaxMessageCount, testChainID1, chainSupport1, rls[1], t)

//standard channel 2
chainSupport2 := manager.GetChain(testChainID2)
assert.NotNilf(t, chainSupport2, "Should have gotten chain which was initialized by ramledger")
assert.True(t, !chainSupport2.IsSystemChannel())
assert.True(t, !chainSupport2.MigrationStatus().IsPending())

testMessageOrderAndRetrieval(confSys.Orderer.BatchSize.MaxMessageCount, testChainID2, chainSupport2, rls[2], t)

assert.Equal(t, 3, manager.ChannelsCount(), "Three channels")

// Test MigrationController methods
assert.True(t, !manager.ConsensusMigrationPending())
err := manager.ConsensusMigrationCommit()
assert.EqualError(t, err,
"cannot commit consensus-type migration because system channel (testchainid): state=MIG_STATE_NONE, context=0 (expect: state=MIG_STATE_START, context>0)")

ctx := uint64(5)
err = manager.ConsensusMigrationStart(ctx)
assert.NoError(t, err, "Migration start")
assert.True(t, manager.ConsensusMigrationPending())
assert.True(t, chainSupport.MigrationStatus().IsPending())
assert.True(t, chainSupport1.MigrationStatus().IsPending())
assert.True(t, chainSupport2.MigrationStatus().IsPending())
err = manager.ConsensusMigrationStart(ctx)
// Iteration order is unpredictable... can be 1 or 2
assert.Regexp(t, "cannot start new consensus-type migration because standard channel testchainid[12], still pending", err.Error())

configTx := makeConfigTx(genesisconfig.TestChainID+"Fake", 6)
_, _, _, err = manager.BroadcastChannelSupport(configTx)
assert.Equal(t, "cannot create channel because consensus-type migration is pending", err.Error())

err = manager.ConsensusMigrationCommit()
// Iteration order is unpredictable... can be 1 or 2
assert.Regexp(t, "cannot commit consensus-type migration because standard channel testchainid[12], still pending, state=MIG_STATE_START", err.Error())

chainSupport1.MigrationStatus().SetStateContext(ab.ConsensusType_MIG_STATE_CONTEXT, ctx)
assert.True(t, chainSupport1.MigrationStatus().IsPending())
err = manager.ConsensusMigrationCommit()
assert.EqualError(t, err, "cannot commit consensus-type migration because standard channel testchainid2, still pending, state=MIG_STATE_START")

chainSupport2.MigrationStatus().SetStateContext(ab.ConsensusType_MIG_STATE_CONTEXT, ctx+1)
assert.True(t, chainSupport2.MigrationStatus().IsPending())

err = manager.ConsensusMigrationCommit()
assert.EqualError(t, err, "cannot commit consensus-type migration because standard channel testchainid2, bad context=6, expected=5")

chainSupport2.MigrationStatus().SetStateContext(ab.ConsensusType_MIG_STATE_CONTEXT, ctx)
assert.True(t, chainSupport2.MigrationStatus().IsPending())

err = manager.ConsensusMigrationCommit()
assert.NoError(t, err, "Migration can commit")
})

t.Run("Abort path", func(t *testing.T) {
lf, rls := newRAMLedgerAndFactory3Chan(10,
genesisconfig.TestChainID, genesisBlockSys,
testChainID1, genesisBlockStd1,
testChainID2, genesisBlockStd2)

consenters := make(map[string]consensus.Consenter)
consenters[confSys.Orderer.OrdererType] = &mockConsenter{}

manager := NewRegistrar(lf, mockCrypto(), &disabled.Provider{})
manager.Initialize(consenters)

chainSupport := manager.GetChain("Fake")
assert.Nilf(t, chainSupport, "Should not have found a chain that was not created")

chainSupport = manager.GetChain(genesisconfig.TestChainID)
require.NotNilf(t, chainSupport, "Should have gotten chain which was initialized by ramledger")
testMessageOrderAndRetrieval(confSys.Orderer.BatchSize.MaxMessageCount, genesisconfig.TestChainID, chainSupport, rls[0], t)

//standard channel 1
chainSupport1 := manager.GetChain(testChainID1)
require.NotNilf(t, chainSupport1, "Should have gotten chain which was initialized by ramledger")
testMessageOrderAndRetrieval(confSys.Orderer.BatchSize.MaxMessageCount, testChainID1, chainSupport1, rls[1], t)

//standard channel 2
chainSupport2 := manager.GetChain(testChainID2)
require.NotNilf(t, chainSupport2, "Should have gotten chain which was initialized by ramledger")
testMessageOrderAndRetrieval(confSys.Orderer.BatchSize.MaxMessageCount, testChainID2, chainSupport2, rls[2], t)

assert.Equal(t, 3, manager.ChannelsCount(), "Three channels")

// Test MigrationController methods
ctx := uint64(5)
err := manager.ConsensusMigrationStart(ctx)
assert.NoError(t, err, "Migration start")
assert.True(t, manager.ConsensusMigrationPending())
assert.True(t, chainSupport.MigrationStatus().IsPending())
chainSupport1.MigrationStatus().SetStateContext(ab.ConsensusType_MIG_STATE_CONTEXT, ctx)
assert.True(t, chainSupport1.MigrationStatus().IsPending())

err = manager.ConsensusMigrationAbort()
assert.EqualError(t, err, "Not implemented yet")

//TODO test abort path here, when implemented
})
}

func TestCreateChain(t *testing.T) {
//system channel
confSys := configtxgentest.Load(genesisconfig.SampleInsecureSoloProfile)
Expand Down
Loading

0 comments on commit 925db23

Please sign in to comment.