Skip to content

Commit

Permalink
[Backport] #2936 to release-2.2 (#2953)
Browse files Browse the repository at this point in the history
* do not create new chain of type etcdraft.Chain if such exists in map of chains. This can happen when in Raft protocol a channel was created, but not marked as done in WAL logs, so at orderer startup it will try to rerun creation tx and panic because the channel already exists.

Signed-off-by: Vladyslav Kopaihorodskyi <vlad.kopaygorodsky@gmail.com>

* Reverted to single newChain method to produce less changes in backport.

Signed-off-by: Vladyslav Kopaihorodskyi <vlad.kopaygorodsky@gmail.com>
  • Loading branch information
kopaygorodsky authored Sep 29, 2021
1 parent eddb470 commit acf88a0
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 1 deletion.
43 changes: 42 additions & 1 deletion orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,10 @@ func (r *Registrar) CreateChain(chainName string) {
if chain != nil {
logger.Infof("A chain of type %T for channel %s already exists. "+
"Halting it.", chain.Chain, chainName)
r.lock.Lock()
chain.Halt()
delete(r.chains, chainName)
r.lock.Unlock()
}
r.newChain(configTx(lf))
}
Expand All @@ -357,14 +360,30 @@ func (r *Registrar) newChain(configtx *cb.Envelope) {
r.lock.Lock()
defer r.lock.Unlock()

channelName, err := channelNameFromConfigTx(configtx)
if err != nil {
logger.Warnf("Failed extracting channel name: %v", err)
return
}

// fixes https://github.com/hyperledger/fabric/issues/2931
if existingChain, exists := r.chains[channelName]; exists {
if raftChain, isRaftChain := existingChain.Chain.(RaftChain); isRaftChain && raftChain.IsRaft() {
logger.Infof("Channel %s already created, skipping its creation", channelName)
return
}
}

ledgerResources, err := r.newLedgerResources(configtx)
if err != nil {
logger.Panicf("Error creating ledger resources: %s", err)
}

// If we have no blocks, we need to create the genesis block ourselves.
if ledgerResources.Height() == 0 {
ledgerResources.Append(blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx}))
if err := ledgerResources.Append(blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx})); err != nil {
logger.Panicf("Error appending genesis block to ledger: %s", err)
}
}
cs, err := newChainSupport(r, ledgerResources, r.consenters, r.signer, r.blockcutterMetrics, r.bccsp)
if err != nil {
Expand Down Expand Up @@ -494,3 +513,25 @@ func (r *Registrar) RemoveChannel(channelID string, removeStorage bool) error {
//TODO
return errors.New("Not implemented yet")
}

type RaftChain interface {
IsRaft() bool
}

func channelNameFromConfigTx(configtx *cb.Envelope) (string, error) {
payload, err := protoutil.UnmarshalPayload(configtx.Payload)
if err != nil {
return "", errors.WithMessage(err, "error umarshaling envelope to payload")
}

if payload.Header == nil {
return "", errors.New("missing channel header")
}

chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return "", errors.WithMessage(err, "error unmarshalling channel header")
}

return chdr.ChannelId, nil
}
61 changes: 61 additions & 0 deletions orderer/common/multichannel/registrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,38 @@ func TestRegistrar_JoinChannel(t *testing.T) {
// After creating the chain, it exists
assert.NotNil(t, registrar.GetChain("my-raft-channel"))
})

t.Run("chain of type etcdraft.Chain is already created", func(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "registrar_test-")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)

ledgerFactory, _ := newLedgerAndFactory(tmpdir, "", nil)
mockConsenters := map[string]consensus.Consenter{confSys.Orderer.OrdererType: &mockConsenter{}, "etcdraft": &mockConsenter{}}
config := localconfig.TopLevel{}
config.General.BootstrapMethod = "none"
config.General.GenesisFile = ""
registrar := NewRegistrar(config, ledgerFactory, mockCrypto(), &disabled.Provider{}, cryptoProvider)
registrar.Initialize(mockConsenters)

ledger, err := ledgerFactory.GetOrCreate("my-channel")
assert.NoError(t, err)
ledger.Append(genesisBlockApp)

testChainSupport := &ChainSupport{Chain: &mockRaftChain{}}
registrar.chains["test"] = testChainSupport

orglessChannelConf := genesisconfig.Load(genesisconfig.SampleSingleMSPChannelProfile, configtest.GetDevConfigDir())
envConfigUpdate, err := encoder.MakeChannelCreationTransaction("test", mockCrypto(), orglessChannelConf)
require.NoError(t, err, "Constructing chain creation tx")

registrar.newChain(envConfigUpdate)

testChainSupport2 := registrar.GetChain("test")
require.NotNil(t, testChainSupport2)

assert.Same(t, testChainSupport, testChainSupport2)
})
}

func generateCertificates(t *testing.T, confAppRaft *genesisconfig.Profile, tlsCA tlsgen.CA, certDir string) {
Expand Down Expand Up @@ -728,3 +760,32 @@ func TestRegistrar_ConfigBlockOrPanic(t *testing.T) {
assert.Equal(t, genesisBlockSys.Data, cBlock.Data)
})
}

type mockRaftChain struct {
}

func (c *mockRaftChain) Order(env *cb.Envelope, configSeq uint64) error {
return nil
}

func (c *mockRaftChain) Configure(config *cb.Envelope, configSeq uint64) error {
return nil
}

func (c *mockRaftChain) WaitReady() error {
return nil
}

func (c *mockRaftChain) Errored() <-chan struct{} {
return nil
}

func (c *mockRaftChain) Start() {
}

func (c *mockRaftChain) Halt() {
}

func (c *mockRaftChain) IsRaft() bool {
return true
}
4 changes: 4 additions & 0 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1433,3 +1433,7 @@ func (c *Chain) triggerCatchup(sn *raftpb.Snapshot) {
case <-c.doneC:
}
}

func (c *Chain) IsRaft() bool {
return true
}

0 comments on commit acf88a0

Please sign in to comment.