Skip to content

Commit

Permalink
[FAB-18521] Replicate block metadata with block while OSN catching up (
Browse files Browse the repository at this point in the history
…#2748)

While OSN catches up replicating block from the up-to-date replica the
metadata information omitted, i.e.

```
c.support.WriteBlock(block, nil)
```

where `nil` substitutes for block's metadata.

In this commit, the consenters metadata extracted from the replicated
block and being written with the block.

Signed-off-by: Artem Barger <artem@bargr.net>
(cherry picked from commit 44ab2bf)

# Conflicts:
#	integration/raft/cft_test.go
  • Loading branch information
C0rWin authored and mergify-bot committed Jul 15, 2021
1 parent 8fd2ad8 commit 29cbe91
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 6 deletions.
215 changes: 211 additions & 4 deletions integration/raft/cft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ import (
"github.com/hyperledger/fabric-config/configtx/orderer"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/msp"
<<<<<<< HEAD
"github.com/hyperledger/fabric/cmd/common/signer"
=======
"github.com/hyperledger/fabric-protos-go/orderer/etcdraft"
>>>>>>> 44ab2bf96 ([FAB-18521] Replicate block metadata with block while OSN catching up (#2748))
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/util"
intconftx "github.com/hyperledger/fabric/integration/configtx"
Expand Down Expand Up @@ -234,12 +238,215 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
env = CreateBroadcastEnvelope(network, o1, channelID, make([]byte, 1000))
resp, err := nwo.Broadcast(network, o1, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))
Eventually(resp.Status, network.EventuallyTimeout).Should(Equal(common.Status_SUCCESS))

blko1 := FetchBlock(network, o1, 5, channelID)
blko2 := FetchBlock(network, o2, 5, channelID)
for i := 1; i <= 5; i++ {
blko1 := FetchBlock(network, o1, uint64(i), channelID)
blko2 := FetchBlock(network, o2, uint64(i), channelID)

Expect(blko1.Header.DataHash).To(Equal(blko2.Header.DataHash))
Expect(blko1.Header.DataHash).To(Equal(blko2.Header.DataHash))
metao1, err := protoutil.GetConsenterMetadataFromBlock(blko1)
Expect(err).NotTo(HaveOccurred())
metao2, err := protoutil.GetConsenterMetadataFromBlock(blko2)
Expect(err).NotTo(HaveOccurred())

bmo1 := &etcdraft.BlockMetadata{}
proto.Unmarshal(metao1.Value, bmo1)
bmo2 := &etcdraft.BlockMetadata{}
proto.Unmarshal(metao2.Value, bmo2)

Expect(bmo2).To(Equal(bmo1))
}
})

It("catches up and replicates consenters metadata", func() {
network = nwo.New(nwo.MultiNodeEtcdRaft(), testDir, client, StartPort(), components)
orderers := []*nwo.Orderer{network.Orderer("orderer1"), network.Orderer("orderer2"), network.Orderer("orderer3")}
peer = network.Peer("Org1", "peer0")

network.GenerateConfigTree()
network.Bootstrap()

ordererRunners := []*ginkgomon.Runner{}
orderersMembers := grouper.Members{}
for _, o := range orderers {
runner := network.OrdererRunner(o)
ordererRunners = append(ordererRunners, runner)
orderersMembers = append(orderersMembers, grouper.Member{
Name: o.ID(),
Runner: runner,
})
}

By("Starting ordering service cluster")
ordererGroup := grouper.NewParallel(syscall.SIGTERM, orderersMembers)
ordererProc = ifrit.Invoke(ordererGroup)
Eventually(ordererProc.Ready(), network.EventuallyTimeout).Should(BeClosed())

By("Setting up new OSN to be added to the cluster")
o4 := &nwo.Orderer{
Name: "orderer4",
Organization: "OrdererOrg",
}
ports := nwo.Ports{}
for _, portName := range nwo.OrdererPortNames() {
ports[portName] = network.ReservePort()
}

network.PortsByOrdererID[o4.ID()] = ports
network.Orderers = append(network.Orderers, o4)
network.GenerateOrdererConfig(o4)
extendNetwork(network)

ordererCertificatePath := filepath.Join(network.OrdererLocalTLSDir(o4), "server.crt")
ordererCert, err := ioutil.ReadFile(ordererCertificatePath)
Expect(err).NotTo(HaveOccurred())

By("Adding new ordering service node")
addConsenter(network, peer, orderers[0], "systemchannel", etcdraft.Consenter{
ServerTlsCert: ordererCert,
ClientTlsCert: ordererCert,
Host: "127.0.0.1",
Port: uint32(network.OrdererPort(o4, nwo.ClusterPort)),
})

// Get the last config block of the system channel
configBlock := nwo.GetConfigBlock(network, peer, orderers[0], "systemchannel")
// Plant it in the file system of orderer, the new node to be onboarded.
err = ioutil.WriteFile(filepath.Join(testDir, "systemchannel_block.pb"), protoutil.MarshalOrPanic(configBlock), 0o644)

Expect(err).NotTo(HaveOccurred())
By("Starting new ordering service node")
r4 := network.OrdererRunner(o4)
orderers = append(orderers, o4)
ordererRunners = append(ordererRunners, r4)
o4process := ifrit.Invoke(r4)
Eventually(o4process.Ready(), network.EventuallyTimeout).Should(BeClosed())

By("Pick ordering service node to be evicted")
victimIdx := findLeader(ordererRunners) - 1
victim := orderers[victimIdx]
victimCertBytes, err := ioutil.ReadFile(filepath.Join(network.OrdererLocalTLSDir(victim), "server.crt"))
Expect(err).NotTo(HaveOccurred())

assertBlockReception(map[string]int{
"systemchannel": 1,
}, orderers, peer, network)

By("Removing OSN from the channel")
removeConsenter(network, peer, victim, "systemchannel", victimCertBytes)

remainedOrderers := []*nwo.Orderer{}
remainedRunners := []*ginkgomon.Runner{}

for i, o := range orderers {
if i == victimIdx {
continue
}
remainedOrderers = append(remainedOrderers, o)
remainedRunners = append(remainedRunners, ordererRunners[i])
}

assertBlockReception(map[string]int{
"systemchannel": 2,
}, remainedOrderers, peer, network)
By("Making sure OSN was evicted and configuration applied")
findLeader(remainedRunners)

By("Restarting all nodes")
o4process.Signal(syscall.SIGTERM)
Eventually(o4process.Wait(), network.EventuallyTimeout).Should(Receive())
ordererProc.Signal(syscall.SIGTERM)
Eventually(ordererProc.Wait(), network.EventuallyTimeout).Should(Receive())

r1 := network.OrdererRunner(remainedOrderers[1])
r2 := network.OrdererRunner(remainedOrderers[2])
orderersMembers = grouper.Members{
{Name: remainedOrderers[1].ID(), Runner: r1},
{Name: remainedOrderers[2].ID(), Runner: r2},
}

ordererGroup = grouper.NewParallel(syscall.SIGTERM, orderersMembers)
ordererProc = ifrit.Invoke(ordererGroup)
Eventually(ordererProc.Ready(), network.EventuallyTimeout).Should(BeClosed())
findLeader([]*ginkgomon.Runner{r1, r2})

By("Submitting several transactions to trigger snapshot")
env := CreateBroadcastEnvelope(network, remainedOrderers[1], "systemchannel", make([]byte, 2000))
for i := 3; i <= 10; i++ {
// Note that MaxMessageCount is 1 be default, so every tx results in a new block
resp, err := ordererclient.Broadcast(network, remainedOrderers[1], env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))
}

assertBlockReception(map[string]int{
"systemchannel": 10,
}, []*nwo.Orderer{remainedOrderers[2]}, peer, network)

By("Clean snapshot folder of lagging behind node")
snapDir := path.Join(network.RootDir, "orderers", remainedOrderers[0].ID(), "etcdraft", "snapshot")
snapshots, err := ioutil.ReadDir(snapDir)
Expect(err).NotTo(HaveOccurred())

for _, snap := range snapshots {
os.RemoveAll(path.Join(snapDir, snap.Name()))
}

ordererProc.Signal(syscall.SIGTERM)
Eventually(ordererProc.Wait(), network.EventuallyTimeout).Should(Receive())

r0 := network.OrdererRunner(remainedOrderers[0])
r1 = network.OrdererRunner(remainedOrderers[1])
orderersMembers = grouper.Members{
{Name: remainedOrderers[0].ID(), Runner: r0},
{Name: remainedOrderers[1].ID(), Runner: r1},
}

ordererGroup = grouper.NewParallel(syscall.SIGTERM, orderersMembers)
ordererProc = ifrit.Invoke(ordererGroup)
Eventually(ordererProc.Ready(), network.EventuallyTimeout).Should(BeClosed())
findLeader([]*ginkgomon.Runner{r0, r1})

By("Asserting that orderer1 receives and persists snapshot")
Eventually(func() int {
files, err := ioutil.ReadDir(path.Join(snapDir, "systemchannel"))
Expect(err).NotTo(HaveOccurred())
return len(files)
}, network.EventuallyTimeout).Should(BeNumerically(">", 0))

By("Make sure we can restart and connect to orderer1 with orderer4")
ordererProc.Signal(syscall.SIGTERM)
Eventually(ordererProc.Wait(), network.EventuallyTimeout).Should(Receive())

r0 = network.OrdererRunner(remainedOrderers[0])
r2 = network.OrdererRunner(remainedOrderers[2])
orderersMembers = grouper.Members{
{Name: remainedOrderers[0].ID(), Runner: r0},
{Name: remainedOrderers[2].ID(), Runner: r2},
}

ordererGroup = grouper.NewParallel(syscall.SIGTERM, orderersMembers)
ordererProc = ifrit.Invoke(ordererGroup)
Eventually(ordererProc.Ready(), network.EventuallyTimeout).Should(BeClosed())
findLeader([]*ginkgomon.Runner{r0, r2})

for i := 1; i <= 10; i++ {
blko1 := FetchBlock(network, remainedOrderers[0], uint64(i), "systemchannel")
blko2 := FetchBlock(network, remainedOrderers[2], uint64(i), "systemchannel")
Expect(blko1.Header.DataHash).To(Equal(blko2.Header.DataHash))
metao1, err := protoutil.GetConsenterMetadataFromBlock(blko1)
Expect(err).NotTo(HaveOccurred())
metao2, err := protoutil.GetConsenterMetadataFromBlock(blko2)
Expect(err).NotTo(HaveOccurred())

bmo1 := &etcdraft.BlockMetadata{}
proto.Unmarshal(metao1.Value, bmo1)
bmo2 := &etcdraft.BlockMetadata{}
proto.Unmarshal(metao2.Value, bmo2)

Expect(bmo2).To(Equal(bmo1))
}
})
})

Expand Down
10 changes: 8 additions & 2 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,12 +951,18 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
}

func (c *Chain) commitBlock(block *common.Block) {
// read consenters metadata to write into the replicated block
blockMeta, err := protoutil.GetConsenterMetadataFromBlock(block)
if err != nil {
c.logger.Panicf("Failed to obtain metadata: %s", err)
}

if !protoutil.IsConfigBlock(block) {
c.support.WriteBlock(block, nil)
c.support.WriteBlock(block, blockMeta.Value)
return
}

c.support.WriteConfigBlock(block, nil)
c.support.WriteConfigBlock(block, blockMeta.Value)

configMembership := c.detectConfChange(block)

Expand Down

0 comments on commit 29cbe91

Please sign in to comment.