Skip to content

Commit

Permalink
Do not create new chain of type etcdraft.Chain if such exists in map …
Browse files Browse the repository at this point in the history
…of chains. This can happen when in Raft protocol a channel was created, but not marked as done in WAL logs, so at orderer startup it will try to rerun creation tx and panic because the channel already exists.

Signed-off-by: Vladyslav Kopaihorodskyi <vlad.kopaygorodsky@gmail.com>
  • Loading branch information
kopaygorodsky authored and denyeart committed Sep 20, 2021
1 parent 8999ce7 commit aa8d06b
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
36 changes: 36 additions & 0 deletions orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/common/types"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -537,7 +538,10 @@ func (r *Registrar) CreateChain(chainName string) {
if chain != nil {
logger.Infof("A chain of type %T for channel %s already exists. "+
"Halting it.", chain.Chain, chainName)
r.lock.Lock()
chain.Halt()
delete(r.chains, chainName)
r.lock.Unlock()
}
r.newChain(configTx(lf))
}
Expand All @@ -546,6 +550,20 @@ func (r *Registrar) newChain(configtx *cb.Envelope) {
r.lock.Lock()
defer r.lock.Unlock()

channelName, err := channelNameFromConfigTx(configtx)
if err != nil {
logger.Warnf("Failed extracting channel name: %v", err)
return
}

// fixes https://github.com/hyperledger/fabric/issues/2931
if existingChain, exists := r.chains[channelName]; exists {
if _, isRaftChain := existingChain.Chain.(*etcdraft.Chain); isRaftChain {
logger.Infof("Channel %s already created, skipping its creation", channelName)
return
}
}

cs := r.createNewChain(configtx)
cs.start()
logger.Infof("Created and started new channel %s", cs.ChannelID())
Expand Down Expand Up @@ -1113,3 +1131,21 @@ func (r *Registrar) ReportConsensusRelationAndStatusMetrics(channelID string, re
r.channelParticipationMetrics.reportConsensusRelation(channelID, relation)
r.channelParticipationMetrics.reportStatus(channelID, status)
}

func channelNameFromConfigTx(configtx *cb.Envelope) (string, error) {
payload, err := protoutil.UnmarshalPayload(configtx.Payload)
if err != nil {
return "", errors.WithMessage(err, "error umarshaling envelope to payload")
}

if payload.Header == nil {
return "", errors.New("missing channel header")
}

chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return "", errors.WithMessage(err, "error unmarshalling channel header")
}

return chdr.ChannelId, nil
}
30 changes: 30 additions & 0 deletions orderer/common/multichannel/registrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/hyperledger/fabric/orderer/common/multichannel/mocks"
"github.com/hyperledger/fabric/orderer/common/types"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -661,6 +662,35 @@ func TestCreateChain(t *testing.T) {
close(chain2.Chain.(*mockChainCluster).queue)
})

t.Run("chain of type etcdraft.Chain is already created", func(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "registrar_test-")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)

lf, _ := newLedgerAndFactory(tmpdir, "testchannelid", genesisBlockSys)

consenter := &mocks.Consenter{}
consenter.HandleChainCalls(handleChain)
consenters := map[string]consensus.Consenter{confSys.Orderer.OrdererType: consenter}

manager := NewRegistrar(localconfig.TopLevel{}, lf, mockCrypto(), &disabled.Provider{}, cryptoProvider, nil)
manager.Initialize(consenters)

testChainSupport := &ChainSupport{Chain: &etcdraft.Chain{}}
manager.chains["test"] = testChainSupport

orglessChannelConf := genesisconfig.Load(genesisconfig.SampleSingleMSPChannelProfile, configtest.GetDevConfigDir())
envConfigUpdate, err := encoder.MakeChannelCreationTransaction("test", mockCrypto(), orglessChannelConf)
require.NoError(t, err, "Constructing chain creation tx")

manager.newChain(envConfigUpdate)

testChainSupport2 := manager.GetChain("test")
require.NotNil(t, testChainSupport2)

assert.Same(t, testChainSupport, testChainSupport2)
})

// This test brings up the entire system, with the mock consenter, including the broadcasters etc. and creates a new chain
t.Run("New chain", func(t *testing.T) {
expectedLastConfigSeq := uint64(1)
Expand Down

0 comments on commit aa8d06b

Please sign in to comment.