Skip to content

Commit

Permalink
Merge changes from topic 'FAB-15045-Mig-v1-cleanup-4'
Browse files Browse the repository at this point in the history
* changes:
  FAB-15045 Mig-v1 cleanup #4
  FAB-15044 Mig-v1 cleanup #3
  • Loading branch information
yacovm authored and Gerrit Code Review committed Apr 11, 2019
2 parents c28d6dc + 0794b78 commit f18b91a
Show file tree
Hide file tree
Showing 21 changed files with 45 additions and 2,554 deletions.
64 changes: 0 additions & 64 deletions orderer/common/broadcast/mock/channel_support_registrar.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions orderer/common/multichannel/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ type ChainSupport struct {
consensus.Chain
cutter blockcutter.Receiver
identity.SignerSerializer
// Needed for consensus-type migration: to execute the migration state machine correctly,
// chains need to know if they are system or standard channel.
systemChannel bool
}

func newChainSupport(
Expand Down Expand Up @@ -59,9 +56,6 @@ func newChainSupport(
),
}

// When ConsortiumsConfig exists, it is the system channel
_, cs.systemChannel = ledgerResources.ConsortiumsConfig()

// Set up the msgprocessor
cs.Processor = msgprocessor.NewStandardChannel(cs, msgprocessor.CreateStandardChannelFilters(cs))

Expand Down Expand Up @@ -183,8 +177,3 @@ func (cs *ChainSupport) VerifyBlockSignature(sd []*protoutil.SignedData, envelop
}
return nil
}

// IsSystemChannel returns true if this is the system channel.
func (cs *ChainSupport) IsSystemChannel() bool {
return cs.systemChannel
}
70 changes: 0 additions & 70 deletions orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,6 @@ func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
r.consenters = consenters
existingChains := r.ledgerFactory.ChainIDs()

//TODO To initialize after consensus-type migration, it is necessary to identify the system channel and create it first,
// determining the correct consensus-type and the state of the migration. This is needed for recovery, in case the
// migration process crashes before it is committed.

for _, chainID := range existingChains {
rl, err := r.ledgerFactory.GetOrCreate(chainID)
if err != nil {
Expand Down Expand Up @@ -232,10 +228,6 @@ func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader
cs := r.GetChain(chdr.ChannelId)
// New channel creation
if cs == nil {
// Prevent channel creation during consensus-type migration
if r.ConsensusMigrationPending() {
return chdr, true, nil, errors.New("cannot create channel because consensus-type migration is pending")
}
cs = r.systemChannel
}

Expand All @@ -251,68 +243,6 @@ func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader
return chdr, isConfig, cs, nil
}

// ConsensusMigrationPending checks whether consensus-type migration is started on the system channel.
func (r *Registrar) ConsensusMigrationPending() bool {
//Note: systemChannel.MigrationStatus().IsPending() is thread safe, takes a mutex
return r.systemChannel.MigrationStatus().IsPending()
}

// ConsensusMigrationStart checks whether consensus-type migration had started,
// and then marks all standard channels as started.
func (r *Registrar) ConsensusMigrationStart(context uint64) error {
r.lock.Lock()
defer r.lock.Unlock()

for id, chain := range r.chains {
if id != r.systemChannel.ChainID() && chain.MigrationStatus().IsPending() {
return errors.Errorf("cannot start new consensus-type migration because standard channel %s, still pending", id)
}
}

for _, chain := range r.chains {
chain.MigrationStatus().SetStateContext(ab.ConsensusType_MIG_STATE_START, context)
}
logger.Debugf("Consensus-type migration: all standard channels marked as started, context=%d", context)

return nil
}

// ConsensusMigrationCommit checks pre-conditions and commits the consensus-type migration.
func (r *Registrar) ConsensusMigrationCommit() error {
r.lock.Lock()
defer r.lock.Unlock()

sysState, sysContext := r.systemChannel.MigrationStatus().StateContext()
if !(sysState == ab.ConsensusType_MIG_STATE_START && sysContext > 0) {
return errors.Errorf("cannot commit consensus-type migration because system channel (%s): state=%s, context=%d (expect: state=%s, context>0)",
r.systemChannel.ChainID(), sysState, sysContext, ab.ConsensusType_MIG_STATE_START)
}

for id, chain := range r.chains {
st, ctx := chain.MigrationStatus().StateContext()
if id == r.systemChannel.ChainID() {
continue
}
if st != ab.ConsensusType_MIG_STATE_CONTEXT {
return errors.Errorf("cannot commit consensus-type migration because standard channel %s, still pending, state=%s", id, st)
}
if ctx != sysContext {
return errors.Errorf("cannot commit consensus-type migration because standard channel %s, bad context=%d, expected=%d", id, ctx, sysContext)
}
}

r.systemChannel.MigrationStatus().SetStateContext(ab.ConsensusType_MIG_STATE_COMMIT, sysContext)
logger.Debugf("Consensus-type migration: system channel marked as committed, context=%d", sysContext)

return nil
}

// ConsensusMigrationAbort checks pre-conditions and aborts the consensus-type migration.
func (r *Registrar) ConsensusMigrationAbort() error {
//TODO implement the consensus-type migration abort path
return fmt.Errorf("Not implemented yet")
}

// GetChain retrieves the chain support for a chain if it exists.
func (r *Registrar) GetChain(chainID string) *ChainSupport {
r.lock.RLock()
Expand Down
156 changes: 1 addition & 155 deletions orderer/common/multichannel/registrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/ledger/blockledger"
ramledger "github.com/hyperledger/fabric/common/ledger/blockledger/ram"
"github.com/hyperledger/fabric/common/ledger/blockledger/ram"
"github.com/hyperledger/fabric/common/metrics/disabled"
mockchannelconfig "github.com/hyperledger/fabric/common/mocks/config"
mockpolicies "github.com/hyperledger/fabric/common/mocks/policies"
Expand All @@ -27,7 +27,6 @@ import (
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

//go:generate counterfeiter -o mocks/signer_serializer.go --fake-name SignerSerializer . signerSerializer
Expand Down Expand Up @@ -202,159 +201,6 @@ func TestNewRegistrar(t *testing.T) {
})
}

// This test essentially brings the entire system up and is ultimately what main.go will replicate,
// doing it on the system and two standard channels.
// Then, it is testing the methods that implement the MigrationController interface,
// used in consensus-type migration.
func Test3ChanConsensusMigrationController(t *testing.T) {
const (
testChainID1 = genesisconfig.TestChainID + "1"
testChainID2 = genesisconfig.TestChainID + "2"
)

var genesisBlockSys *cb.Block
var genesisBlockStd1 *cb.Block
var genesisBlockStd2 *cb.Block

//system channel
confSys := configtxgentest.Load(genesisconfig.SampleInsecureSoloProfile)
genesisBlockSys = encoder.New(confSys).GenesisBlock()
//standard channel, no Consortiums
confStd := configtxgentest.Load(genesisconfig.SampleInsecureSoloProfile)
confStd.Consortiums = nil
genesisBlockStd1 = encoder.New(confStd).GenesisBlockForChannel(testChainID1)
genesisBlockStd2 = encoder.New(confStd).GenesisBlockForChannel(testChainID2)

t.Run("Green path", func(t *testing.T) {
lf, rls := newRAMLedgerAndFactory3Chan(10,
genesisconfig.TestChainID, genesisBlockSys,
testChainID1, genesisBlockStd1,
testChainID2, genesisBlockStd2)

consenters := make(map[string]consensus.Consenter)
consenters[confSys.Orderer.OrdererType] = &mockConsenter{}

manager := NewRegistrar(lf, mockCrypto(), &disabled.Provider{})
manager.Initialize(consenters)

chainSupport := manager.GetChain("Fake")
assert.Nilf(t, chainSupport, "Should not have found a chain that was not created")

chainSupport = manager.GetChain(genesisconfig.TestChainID)
assert.NotNilf(t, chainSupport, "Should have gotten chain which was initialized by ramledger")
assert.True(t, chainSupport.IsSystemChannel())
assert.True(t, !chainSupport.MigrationStatus().IsPending())

testMessageOrderAndRetrieval(confSys.Orderer.BatchSize.MaxMessageCount, genesisconfig.TestChainID, chainSupport, rls[0], t)

//standard channel 1
chainSupport1 := manager.GetChain(testChainID1)
assert.NotNilf(t, chainSupport1, "Should have gotten chain which was initialized by ramledger")
assert.True(t, !chainSupport1.IsSystemChannel())
assert.True(t, !chainSupport1.MigrationStatus().IsPending())

testMessageOrderAndRetrieval(confSys.Orderer.BatchSize.MaxMessageCount, testChainID1, chainSupport1, rls[1], t)

//standard channel 2
chainSupport2 := manager.GetChain(testChainID2)
assert.NotNilf(t, chainSupport2, "Should have gotten chain which was initialized by ramledger")
assert.True(t, !chainSupport2.IsSystemChannel())
assert.True(t, !chainSupport2.MigrationStatus().IsPending())

testMessageOrderAndRetrieval(confSys.Orderer.BatchSize.MaxMessageCount, testChainID2, chainSupport2, rls[2], t)

assert.Equal(t, 3, manager.ChannelsCount(), "Three channels")

// Test MigrationController methods
assert.True(t, !manager.ConsensusMigrationPending())
err := manager.ConsensusMigrationCommit()
assert.EqualError(t, err,
"cannot commit consensus-type migration because system channel (testchainid): state=MIG_STATE_NONE, context=0 (expect: state=MIG_STATE_START, context>0)")

ctx := uint64(5)
err = manager.ConsensusMigrationStart(ctx)
assert.NoError(t, err, "Migration start")
assert.True(t, manager.ConsensusMigrationPending())
assert.True(t, chainSupport.MigrationStatus().IsPending())
assert.True(t, chainSupport1.MigrationStatus().IsPending())
assert.True(t, chainSupport2.MigrationStatus().IsPending())
err = manager.ConsensusMigrationStart(ctx)
// Iteration order is unpredictable... can be 1 or 2
assert.Regexp(t, "cannot start new consensus-type migration because standard channel testchainid[12], still pending", err.Error())

configTx := makeConfigTx(genesisconfig.TestChainID+"Fake", 6)
_, _, _, err = manager.BroadcastChannelSupport(configTx)
assert.Equal(t, "cannot create channel because consensus-type migration is pending", err.Error())

err = manager.ConsensusMigrationCommit()
// Iteration order is unpredictable... can be 1 or 2
assert.Regexp(t, "cannot commit consensus-type migration because standard channel testchainid[12], still pending, state=MIG_STATE_START", err.Error())

chainSupport1.MigrationStatus().SetStateContext(ab.ConsensusType_MIG_STATE_CONTEXT, ctx)
assert.True(t, chainSupport1.MigrationStatus().IsPending())
err = manager.ConsensusMigrationCommit()
assert.EqualError(t, err, "cannot commit consensus-type migration because standard channel testchainid2, still pending, state=MIG_STATE_START")

chainSupport2.MigrationStatus().SetStateContext(ab.ConsensusType_MIG_STATE_CONTEXT, ctx+1)
assert.True(t, chainSupport2.MigrationStatus().IsPending())

err = manager.ConsensusMigrationCommit()
assert.EqualError(t, err, "cannot commit consensus-type migration because standard channel testchainid2, bad context=6, expected=5")

chainSupport2.MigrationStatus().SetStateContext(ab.ConsensusType_MIG_STATE_CONTEXT, ctx)
assert.True(t, chainSupport2.MigrationStatus().IsPending())

err = manager.ConsensusMigrationCommit()
assert.NoError(t, err, "Migration can commit")
})

t.Run("Abort path", func(t *testing.T) {
lf, rls := newRAMLedgerAndFactory3Chan(10,
genesisconfig.TestChainID, genesisBlockSys,
testChainID1, genesisBlockStd1,
testChainID2, genesisBlockStd2)

consenters := make(map[string]consensus.Consenter)
consenters[confSys.Orderer.OrdererType] = &mockConsenter{}

manager := NewRegistrar(lf, mockCrypto(), &disabled.Provider{})
manager.Initialize(consenters)

chainSupport := manager.GetChain("Fake")
assert.Nilf(t, chainSupport, "Should not have found a chain that was not created")

chainSupport = manager.GetChain(genesisconfig.TestChainID)
require.NotNilf(t, chainSupport, "Should have gotten chain which was initialized by ramledger")
testMessageOrderAndRetrieval(confSys.Orderer.BatchSize.MaxMessageCount, genesisconfig.TestChainID, chainSupport, rls[0], t)

//standard channel 1
chainSupport1 := manager.GetChain(testChainID1)
require.NotNilf(t, chainSupport1, "Should have gotten chain which was initialized by ramledger")
testMessageOrderAndRetrieval(confSys.Orderer.BatchSize.MaxMessageCount, testChainID1, chainSupport1, rls[1], t)

//standard channel 2
chainSupport2 := manager.GetChain(testChainID2)
require.NotNilf(t, chainSupport2, "Should have gotten chain which was initialized by ramledger")
testMessageOrderAndRetrieval(confSys.Orderer.BatchSize.MaxMessageCount, testChainID2, chainSupport2, rls[2], t)

assert.Equal(t, 3, manager.ChannelsCount(), "Three channels")

// Test MigrationController methods
ctx := uint64(5)
err := manager.ConsensusMigrationStart(ctx)
assert.NoError(t, err, "Migration start")
assert.True(t, manager.ConsensusMigrationPending())
assert.True(t, chainSupport.MigrationStatus().IsPending())
chainSupport1.MigrationStatus().SetStateContext(ab.ConsensusType_MIG_STATE_CONTEXT, ctx)
assert.True(t, chainSupport1.MigrationStatus().IsPending())

err = manager.ConsensusMigrationAbort()
assert.EqualError(t, err, "Not implemented yet")

//TODO test abort path here, when implemented
})
}

func TestCreateChain(t *testing.T) {
//system channel
confSys := configtxgentest.Load(genesisconfig.SampleInsecureSoloProfile)
Expand Down
Loading

0 comments on commit f18b91a

Please sign in to comment.