Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replicate block metadata with block while OSN catching up (backport #2748) #2757

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 211 additions & 4 deletions integration/raft/cft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import (
conftx "github.com/hyperledger/fabric-config/configtx"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/msp"
<<<<<<< HEAD
"github.com/hyperledger/fabric/cmd/common/signer"
=======
"github.com/hyperledger/fabric-protos-go/orderer/etcdraft"
>>>>>>> 44ab2bf96 ([FAB-18521] Replicate block metadata with block while OSN catching up (#2748))
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/integration/nwo"
Expand Down Expand Up @@ -233,12 +237,215 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
env = CreateBroadcastEnvelope(network, o1, channelID, make([]byte, 1000))
resp, err := ordererclient.Broadcast(network, o1, env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))
Eventually(resp.Status, network.EventuallyTimeout).Should(Equal(common.Status_SUCCESS))

blko1 := FetchBlock(network, o1, 5, channelID)
blko2 := FetchBlock(network, o2, 5, channelID)
for i := 1; i <= 5; i++ {
blko1 := FetchBlock(network, o1, uint64(i), channelID)
blko2 := FetchBlock(network, o2, uint64(i), channelID)

Expect(blko1.Header.DataHash).To(Equal(blko2.Header.DataHash))
Expect(blko1.Header.DataHash).To(Equal(blko2.Header.DataHash))
metao1, err := protoutil.GetConsenterMetadataFromBlock(blko1)
Expect(err).NotTo(HaveOccurred())
metao2, err := protoutil.GetConsenterMetadataFromBlock(blko2)
Expect(err).NotTo(HaveOccurred())

bmo1 := &etcdraft.BlockMetadata{}
proto.Unmarshal(metao1.Value, bmo1)
bmo2 := &etcdraft.BlockMetadata{}
proto.Unmarshal(metao2.Value, bmo2)

Expect(bmo2).To(Equal(bmo1))
}
})

It("catches up and replicates consenters metadata", func() {
network = nwo.New(nwo.MultiNodeEtcdRaft(), testDir, client, StartPort(), components)
orderers := []*nwo.Orderer{network.Orderer("orderer1"), network.Orderer("orderer2"), network.Orderer("orderer3")}
peer = network.Peer("Org1", "peer0")

network.GenerateConfigTree()
network.Bootstrap()

ordererRunners := []*ginkgomon.Runner{}
orderersMembers := grouper.Members{}
for _, o := range orderers {
runner := network.OrdererRunner(o)
ordererRunners = append(ordererRunners, runner)
orderersMembers = append(orderersMembers, grouper.Member{
Name: o.ID(),
Runner: runner,
})
}

By("Starting ordering service cluster")
ordererGroup := grouper.NewParallel(syscall.SIGTERM, orderersMembers)
ordererProc = ifrit.Invoke(ordererGroup)
Eventually(ordererProc.Ready(), network.EventuallyTimeout).Should(BeClosed())

By("Setting up new OSN to be added to the cluster")
o4 := &nwo.Orderer{
Name: "orderer4",
Organization: "OrdererOrg",
}
ports := nwo.Ports{}
for _, portName := range nwo.OrdererPortNames() {
ports[portName] = network.ReservePort()
}

network.PortsByOrdererID[o4.ID()] = ports
network.Orderers = append(network.Orderers, o4)
network.GenerateOrdererConfig(o4)
extendNetwork(network)

ordererCertificatePath := filepath.Join(network.OrdererLocalTLSDir(o4), "server.crt")
ordererCert, err := ioutil.ReadFile(ordererCertificatePath)
Expect(err).NotTo(HaveOccurred())

By("Adding new ordering service node")
addConsenter(network, peer, orderers[0], "systemchannel", etcdraft.Consenter{
ServerTlsCert: ordererCert,
ClientTlsCert: ordererCert,
Host: "127.0.0.1",
Port: uint32(network.OrdererPort(o4, nwo.ClusterPort)),
})

// Get the last config block of the system channel
configBlock := nwo.GetConfigBlock(network, peer, orderers[0], "systemchannel")
// Plant it in the file system of orderer, the new node to be onboarded.
err = ioutil.WriteFile(filepath.Join(testDir, "systemchannel_block.pb"), protoutil.MarshalOrPanic(configBlock), 0o644)

Expect(err).NotTo(HaveOccurred())
By("Starting new ordering service node")
r4 := network.OrdererRunner(o4)
orderers = append(orderers, o4)
ordererRunners = append(ordererRunners, r4)
o4process := ifrit.Invoke(r4)
Eventually(o4process.Ready(), network.EventuallyTimeout).Should(BeClosed())

By("Pick ordering service node to be evicted")
victimIdx := findLeader(ordererRunners) - 1
victim := orderers[victimIdx]
victimCertBytes, err := ioutil.ReadFile(filepath.Join(network.OrdererLocalTLSDir(victim), "server.crt"))
Expect(err).NotTo(HaveOccurred())

assertBlockReception(map[string]int{
"systemchannel": 1,
}, orderers, peer, network)

By("Removing OSN from the channel")
removeConsenter(network, peer, victim, "systemchannel", victimCertBytes)

remainedOrderers := []*nwo.Orderer{}
remainedRunners := []*ginkgomon.Runner{}

for i, o := range orderers {
if i == victimIdx {
continue
}
remainedOrderers = append(remainedOrderers, o)
remainedRunners = append(remainedRunners, ordererRunners[i])
}

assertBlockReception(map[string]int{
"systemchannel": 2,
}, remainedOrderers, peer, network)
By("Making sure OSN was evicted and configuration applied")
findLeader(remainedRunners)

By("Restarting all nodes")
o4process.Signal(syscall.SIGTERM)
Eventually(o4process.Wait(), network.EventuallyTimeout).Should(Receive())
ordererProc.Signal(syscall.SIGTERM)
Eventually(ordererProc.Wait(), network.EventuallyTimeout).Should(Receive())

r1 := network.OrdererRunner(remainedOrderers[1])
r2 := network.OrdererRunner(remainedOrderers[2])
orderersMembers = grouper.Members{
{Name: remainedOrderers[1].ID(), Runner: r1},
{Name: remainedOrderers[2].ID(), Runner: r2},
}

ordererGroup = grouper.NewParallel(syscall.SIGTERM, orderersMembers)
ordererProc = ifrit.Invoke(ordererGroup)
Eventually(ordererProc.Ready(), network.EventuallyTimeout).Should(BeClosed())
findLeader([]*ginkgomon.Runner{r1, r2})

By("Submitting several transactions to trigger snapshot")
env := CreateBroadcastEnvelope(network, remainedOrderers[1], "systemchannel", make([]byte, 2000))
for i := 3; i <= 10; i++ {
// Note that MaxMessageCount is 1 be default, so every tx results in a new block
resp, err := ordererclient.Broadcast(network, remainedOrderers[1], env)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))
}

assertBlockReception(map[string]int{
"systemchannel": 10,
}, []*nwo.Orderer{remainedOrderers[2]}, peer, network)

By("Clean snapshot folder of lagging behind node")
snapDir := path.Join(network.RootDir, "orderers", remainedOrderers[0].ID(), "etcdraft", "snapshot")
snapshots, err := ioutil.ReadDir(snapDir)
Expect(err).NotTo(HaveOccurred())

for _, snap := range snapshots {
os.RemoveAll(path.Join(snapDir, snap.Name()))
}

ordererProc.Signal(syscall.SIGTERM)
Eventually(ordererProc.Wait(), network.EventuallyTimeout).Should(Receive())

r0 := network.OrdererRunner(remainedOrderers[0])
r1 = network.OrdererRunner(remainedOrderers[1])
orderersMembers = grouper.Members{
{Name: remainedOrderers[0].ID(), Runner: r0},
{Name: remainedOrderers[1].ID(), Runner: r1},
}

ordererGroup = grouper.NewParallel(syscall.SIGTERM, orderersMembers)
ordererProc = ifrit.Invoke(ordererGroup)
Eventually(ordererProc.Ready(), network.EventuallyTimeout).Should(BeClosed())
findLeader([]*ginkgomon.Runner{r0, r1})

By("Asserting that orderer1 receives and persists snapshot")
Eventually(func() int {
files, err := ioutil.ReadDir(path.Join(snapDir, "systemchannel"))
Expect(err).NotTo(HaveOccurred())
return len(files)
}, network.EventuallyTimeout).Should(BeNumerically(">", 0))

By("Make sure we can restart and connect to orderer1 with orderer4")
ordererProc.Signal(syscall.SIGTERM)
Eventually(ordererProc.Wait(), network.EventuallyTimeout).Should(Receive())

r0 = network.OrdererRunner(remainedOrderers[0])
r2 = network.OrdererRunner(remainedOrderers[2])
orderersMembers = grouper.Members{
{Name: remainedOrderers[0].ID(), Runner: r0},
{Name: remainedOrderers[2].ID(), Runner: r2},
}

ordererGroup = grouper.NewParallel(syscall.SIGTERM, orderersMembers)
ordererProc = ifrit.Invoke(ordererGroup)
Eventually(ordererProc.Ready(), network.EventuallyTimeout).Should(BeClosed())
findLeader([]*ginkgomon.Runner{r0, r2})

for i := 1; i <= 10; i++ {
blko1 := FetchBlock(network, remainedOrderers[0], uint64(i), "systemchannel")
blko2 := FetchBlock(network, remainedOrderers[2], uint64(i), "systemchannel")
Expect(blko1.Header.DataHash).To(Equal(blko2.Header.DataHash))
metao1, err := protoutil.GetConsenterMetadataFromBlock(blko1)
Expect(err).NotTo(HaveOccurred())
metao2, err := protoutil.GetConsenterMetadataFromBlock(blko2)
Expect(err).NotTo(HaveOccurred())

bmo1 := &etcdraft.BlockMetadata{}
proto.Unmarshal(metao1.Value, bmo1)
bmo2 := &etcdraft.BlockMetadata{}
proto.Unmarshal(metao2.Value, bmo2)

Expect(bmo2).To(Equal(bmo1))
}
})
})

Expand Down
10 changes: 8 additions & 2 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,12 +985,18 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
}

func (c *Chain) commitBlock(block *common.Block) {
// read consenters metadata to write into the replicated block
blockMeta, err := protoutil.GetConsenterMetadataFromBlock(block)
if err != nil {
c.logger.Panicf("Failed to obtain metadata: %s", err)
}

if !protoutil.IsConfigBlock(block) {
c.support.WriteBlock(block, nil)
c.support.WriteBlock(block, blockMeta.Value)
return
}

c.support.WriteConfigBlock(block, nil)
c.support.WriteConfigBlock(block, blockMeta.Value)

configMembership := c.detectConfChange(block)

Expand Down