From a99dbc2eaeb5c7e991b63430fadb316a4b80956a Mon Sep 17 00:00:00 2001 From: Fedor Partanskiy Date: Mon, 11 Dec 2023 03:46:07 +0300 Subject: [PATCH] removed the request from the pool if the check showed that the proposal is bad Signed-off-by: Fedor Partanskiy --- integration/smartbft/smartbft_test.go | 110 ++++++++++++++++++++++++++ orderer/consensus/smartbft/chain.go | 8 ++ 2 files changed, 118 insertions(+) diff --git a/integration/smartbft/smartbft_test.go b/integration/smartbft/smartbft_test.go index ce1c123d10e..30d140621dc 100644 --- a/integration/smartbft/smartbft_test.go +++ b/integration/smartbft/smartbft_test.go @@ -19,6 +19,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "syscall" "time" @@ -1809,6 +1810,115 @@ var _ = Describe("EndToEnd Smart BFT configuration test", func() { By("invoking the chaincode") invokeQuery(network, peer, network.Orderers[1], channel, 90) }) + + It("send an update transaction to each orderer and deleting failed requests", func() { + channel := "testchannel1" + By("Create network") + networkConfig := nwo.MultiNodeSmartBFT() + networkConfig.Channels = nil + network = nwo.New(networkConfig, testDir, client, StartPort(), components) + network.GenerateConfigTree() + network.Bootstrap() + network.EventuallyTimeout *= 2 + + By("Start orderers") + var ordererRunners []*ginkgomon.Runner + for _, orderer := range network.Orderers { + runner := network.OrdererRunner(orderer, + "FABRIC_LOGGING_SPEC=orderer.common.cluster=debug:orderer.consensus.smartbft=debug:policies.ImplicitOrderer=debug", + "ORDERER_GENERAL_BACKOFF_MAXDELAY=20s") + ordererRunners = append(ordererRunners, runner) + proc := ifrit.Invoke(runner) + ordererProcesses = append(ordererProcesses, proc) + Eventually(proc.Ready(), network.EventuallyTimeout).Should(BeClosed()) + } + + By("Start peer") + peerRunner := network.PeerGroupRunner() + peerProcesses = ifrit.Invoke(peerRunner) + Eventually(peerProcesses.Ready(), network.EventuallyTimeout).Should(BeClosed()) + peer := network.Peer("Org1", "peer0") + + By("Join network to channel") + joinChannel(network, channel) + + By("Waiting for followers to see the leader") + Eventually(ordererRunners[1].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1")) + Eventually(ordererRunners[2].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1")) + Eventually(ordererRunners[3].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1")) + + By("Joining peers to channel") + orderer1 := network.Orderers[0] + network.JoinChannel(channel, orderer1, network.PeersWithChannel(channel)...) + + By("Deploying chaincode") + deployChaincode(network, channel, testDir) + + By("querying the chaincode") + sess, err := network.PeerUserSession(peer, "User1", commands.ChaincodeQuery{ + ChannelID: channel, + Name: "mycc", + Ctor: `{"Args":["query","a"]}`, + }) + Expect(err).NotTo(HaveOccurred()) + Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0)) + Expect(sess).To(gbytes.Say("100")) + + By("invoking the chaincode") + invokeQuery(network, peer, network.Orderers[1], channel, 90) + + By("Changing the channel config, e.g. maximum number of bytes in a 1 MB batch") + newAbsoluteMaxBytes := 1_000_000 + config := nwo.GetConfig(network, peer, orderer1, channel) + updatedConfig := proto.Clone(config).(*common.Config) + batchSizeConfigValue := updatedConfig.ChannelGroup.Groups["Orderer"].Values["BatchSize"] + batchSizeValue := &ordererProtos.BatchSize{} + Expect(proto.Unmarshal(batchSizeConfigValue.Value, batchSizeValue)).To(Succeed()) + batchSizeValue.AbsoluteMaxBytes = uint32(newAbsoluteMaxBytes) + updatedConfig.ChannelGroup.Groups["Orderer"].Values["BatchSize"] = &common.ConfigValue{ + ModPolicy: "Admins", + Value: protoutil.MarshalOrPanic(batchSizeValue), + } + + tempDir, err := os.MkdirTemp(network.RootDir, "updateConfig") + Expect(err).NotTo(HaveOccurred()) + updateFile := filepath.Join(tempDir, "update.pb") + defer os.RemoveAll(tempDir) + + nwo.ComputeUpdateOrdererConfig(updateFile, network, channel, config, updatedConfig, peer, orderer1) + + fileData, err := os.ReadFile(updateFile) + Expect(err).NotTo(HaveOccurred()) + + ctxEnv, err := protoutil.UnmarshalEnvelope(fileData) + Expect(err).NotTo(HaveOccurred()) + + By("Begin broadcast") + var wg sync.WaitGroup + for i := range network.Orderers { + wg.Add(1) + go func(ccid int) { + defer wg.Done() + resp, err := ordererclient.Broadcast(network, network.Orderers[ccid], ctxEnv) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.Status).To(Equal(common.Status_SUCCESS)) + }(i) + } + wg.Wait() + By("End broadcast") + + // move the logger cursor to the beginning of the broadcast + Eventually(ordererRunners[0].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("verifyConfigUpdateMsg")) + Eventually(ordererRunners[1].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("verifyConfigUpdateMsg")) + Eventually(ordererRunners[2].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("verifyConfigUpdateMsg")) + Eventually(ordererRunners[3].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("verifyConfigUpdateMsg")) + + // the channel update transaction will have to be deleted from each orderer + Eventually(ordererRunners[0].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("deleteRequest")) + Eventually(ordererRunners[1].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("deleteRequest")) + Eventually(ordererRunners[2].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("deleteRequest")) + Eventually(ordererRunners[3].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("deleteRequest")) + }) }) }) diff --git a/orderer/consensus/smartbft/chain.go b/orderer/consensus/smartbft/chain.go index e8fb96312e2..1f87bb0ae2d 100644 --- a/orderer/consensus/smartbft/chain.go +++ b/orderer/consensus/smartbft/chain.go @@ -312,6 +312,13 @@ func (c *BFTChain) pruneCommittedRequests(block *cb.Block) { wg.Wait() } +func (c *BFTChain) pruneBadRequests() { + c.consensus.Pool.Prune(func(req []byte) error { + _, err := c.consensus.Verifier.VerifyRequest(req) + return err + }) +} + func (c *BFTChain) submit(env *cb.Envelope, configSeq uint64) error { reqBytes, err := proto.Marshal(env) if err != nil { @@ -566,6 +573,7 @@ func (c *BFTChain) updateRuntimeConfig(block *cb.Block) types.Reconfig { if protoutil.IsConfigBlock(block) { c.Comm.Configure(c.Channel, newRTC.RemoteNodes) c.clusterService.ConfigureNodeCerts(c.Channel, newRTC.consenters) + c.pruneBadRequests() } membershipDidNotChange := reflect.DeepEqual(newRTC.Nodes, prevRTC.Nodes)