diff --git a/orderer/common/multichannel/registrar.go b/orderer/common/multichannel/registrar.go index af068479317..88e52da37bb 100644 --- a/orderer/common/multichannel/registrar.go +++ b/orderer/common/multichannel/registrar.go @@ -32,6 +32,7 @@ import ( "github.com/hyperledger/fabric/orderer/common/msgprocessor" "github.com/hyperledger/fabric/orderer/common/types" "github.com/hyperledger/fabric/orderer/consensus" + "github.com/hyperledger/fabric/orderer/consensus/etcdraft" "github.com/hyperledger/fabric/protoutil" "github.com/pkg/errors" ) @@ -537,7 +538,10 @@ func (r *Registrar) CreateChain(chainName string) { if chain != nil { logger.Infof("A chain of type %T for channel %s already exists. "+ "Halting it.", chain.Chain, chainName) + r.lock.Lock() chain.Halt() + delete(r.chains, chainName) + r.lock.Unlock() } r.newChain(configTx(lf)) } @@ -546,6 +550,20 @@ func (r *Registrar) newChain(configtx *cb.Envelope) { r.lock.Lock() defer r.lock.Unlock() + channelName, err := channelNameFromConfigTx(configtx) + if err != nil { + logger.Warnf("Failed extracting channel name: %v", err) + return + } + + // fixes https://github.com/hyperledger/fabric/issues/2931 + if existingChain, exists := r.chains[channelName]; exists { + if _, isRaftChain := existingChain.Chain.(*etcdraft.Chain); isRaftChain { + logger.Infof("Channel %s already created, skipping its creation", channelName) + return + } + } + cs := r.createNewChain(configtx) cs.start() logger.Infof("Created and started new channel %s", cs.ChannelID()) @@ -1113,3 +1131,21 @@ func (r *Registrar) ReportConsensusRelationAndStatusMetrics(channelID string, re r.channelParticipationMetrics.reportConsensusRelation(channelID, relation) r.channelParticipationMetrics.reportStatus(channelID, status) } + +func channelNameFromConfigTx(configtx *cb.Envelope) (string, error) { + payload, err := protoutil.UnmarshalPayload(configtx.Payload) + if err != nil { + return "", errors.WithMessage(err, "error umarshaling envelope to payload") + } + + if payload.Header == nil { + return "", errors.New("missing channel header") + } + + chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader) + if err != nil { + return "", errors.WithMessage(err, "error unmarshalling channel header") + } + + return chdr.ChannelId, nil +} diff --git a/orderer/common/multichannel/registrar_test.go b/orderer/common/multichannel/registrar_test.go index 9c09cea0d4c..cea2c7a65f2 100644 --- a/orderer/common/multichannel/registrar_test.go +++ b/orderer/common/multichannel/registrar_test.go @@ -38,6 +38,7 @@ import ( "github.com/hyperledger/fabric/orderer/common/multichannel/mocks" "github.com/hyperledger/fabric/orderer/common/types" "github.com/hyperledger/fabric/orderer/consensus" + "github.com/hyperledger/fabric/orderer/consensus/etcdraft" "github.com/hyperledger/fabric/protoutil" "github.com/pkg/errors" "github.com/stretchr/testify/assert" @@ -661,6 +662,35 @@ func TestCreateChain(t *testing.T) { close(chain2.Chain.(*mockChainCluster).queue) }) + t.Run("chain of type etcdraft.Chain is already created", func(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "registrar_test-") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + + lf, _ := newLedgerAndFactory(tmpdir, "testchannelid", genesisBlockSys) + + consenter := &mocks.Consenter{} + consenter.HandleChainCalls(handleChain) + consenters := map[string]consensus.Consenter{confSys.Orderer.OrdererType: consenter} + + manager := NewRegistrar(localconfig.TopLevel{}, lf, mockCrypto(), &disabled.Provider{}, cryptoProvider, nil) + manager.Initialize(consenters) + + testChainSupport := &ChainSupport{Chain: &etcdraft.Chain{}} + manager.chains["test"] = testChainSupport + + orglessChannelConf := genesisconfig.Load(genesisconfig.SampleSingleMSPChannelProfile, configtest.GetDevConfigDir()) + envConfigUpdate, err := encoder.MakeChannelCreationTransaction("test", mockCrypto(), orglessChannelConf) + require.NoError(t, err, "Constructing chain creation tx") + + manager.newChain(envConfigUpdate) + + testChainSupport2 := manager.GetChain("test") + require.NotNil(t, testChainSupport2) + + assert.Same(t, testChainSupport, testChainSupport2) + }) + // This test brings up the entire system, with the mock consenter, including the broadcasters etc. and creates a new chain t.Run("New chain", func(t *testing.T) { expectedLastConfigSeq := uint64(1)