From 71b7fda8704f831f77db638272bb18fc25034f28 Mon Sep 17 00:00:00 2001 From: Yoav Tock Date: Wed, 23 Aug 2023 19:05:33 +0300 Subject: [PATCH] CFT Block Puller: Reset total sleep time When a block arrives, reset the total sleep time. Signed-off-by: Yoav Tock Change-Id: I340cd3f6026f68e31af72410f5f42790d8d429d9 --- .../bft_header_receiver_test.go | 2 +- internal/pkg/peer/blocksprovider/deliverer.go | 5 +- .../pkg/peer/blocksprovider/deliverer_test.go | 96 ++++++++++++++++++- 3 files changed, 96 insertions(+), 7 deletions(-) diff --git a/internal/pkg/peer/blocksprovider/bft_header_receiver_test.go b/internal/pkg/peer/blocksprovider/bft_header_receiver_test.go index c0a340f8449..68c72381c15 100644 --- a/internal/pkg/peer/blocksprovider/bft_header_receiver_test.go +++ b/internal/pkg/peer/blocksprovider/bft_header_receiver_test.go @@ -263,7 +263,7 @@ func prepareBlock(seq uint64, contentType orderer.SeekInfo_SeekContentType, good for i := 0; i < numTx; i++ { data.Data[i] = []byte{byte(i), byte(seq)} } - block.Header.DataHash, _ = protoutil.BlockDataHash(data) + block.Header.DataHash = protoutil.ComputeBlockDataHash(data) if contentType == orderer.SeekInfo_BLOCK { block.Data = data } diff --git a/internal/pkg/peer/blocksprovider/deliverer.go b/internal/pkg/peer/blocksprovider/deliverer.go index 5392061691d..f1e0c22d77c 100644 --- a/internal/pkg/peer/blocksprovider/deliverer.go +++ b/internal/pkg/peer/blocksprovider/deliverer.go @@ -74,7 +74,6 @@ type DeliverStreamer interface { // // In the peer, with gossip and a dynamic leader, stopping causes the gossip leader to yield. // In the peer, with gossip and a static leader, we never stop. -// In the orderer, we never stop. type MaxRetryDurationExceededHandler func() (stopRetries bool) const backoffExponentBase = 1.2 @@ -150,7 +149,7 @@ func (d *Deliverer) DeliverBlocks() { totalDuration += sleepDuration if totalDuration > d.MaxRetryDuration { if d.MaxRetryDurationExceededHandler() { - d.Logger.Warningf("attempted to retry block delivery for more than peer.deliveryclient.reconnectTotalTimeThreshold duration %v, giving up", d.MaxRetryDuration) + d.Logger.Warningf("attempted to retry block delivery for more than peer.deliveryclient.reconnectTotalTimeThreshold duration (%s), giving up", d.MaxRetryDuration) return } d.Logger.Warningf("peer is a static leader, ignoring peer.deliveryclient.reconnectTotalTimeThreshold") @@ -203,6 +202,7 @@ func (d *Deliverer) DeliverBlocks() { blockReceiver.Start() // starts an internal goroutine onSuccess := func(blockNum uint64) { failureCounter = 0 + totalDuration = time.Duration(0) } if err := blockReceiver.ProcessIncoming(onSuccess); err != nil { switch err.(type) { @@ -211,6 +211,7 @@ func (d *Deliverer) DeliverBlocks() { case *ErrStopping: // Don't count it as an error, it is a signal to stop. default: + d.Logger.Warningf("Failure in processing incoming messages: %s", err) failureCounter++ } } diff --git a/internal/pkg/peer/blocksprovider/deliverer_test.go b/internal/pkg/peer/blocksprovider/deliverer_test.go index a2389273189..056e70755ec 100644 --- a/internal/pkg/peer/blocksprovider/deliverer_test.go +++ b/internal/pkg/peer/blocksprovider/deliverer_test.go @@ -39,6 +39,7 @@ var _ = Describe("CFT-Deliverer", func() { fakeDeliverStreamer *fake.DeliverStreamer fakeDeliverClient *fake.DeliverClient fakeSleeper *fake.Sleeper + fakeDurationExceededHandler *fake.DurationExceededHandler doneC chan struct{} recvStep chan struct{} endC chan struct{} @@ -98,6 +99,9 @@ var _ = Describe("CFT-Deliverer", func() { fakeDeliverStreamer = &fake.DeliverStreamer{} fakeDeliverStreamer.DeliverReturns(fakeDeliverClient, nil) + fakeDurationExceededHandler = &fake.DurationExceededHandler{} + fakeDurationExceededHandler.DurationExceededHandlerReturns(false) + d = &blocksprovider.Deliverer{ ChannelID: "channel-id", BlockHandler: fakeBlockHandler, @@ -111,7 +115,7 @@ var _ = Describe("CFT-Deliverer", func() { Logger: flogging.MustGetLogger("blocksprovider"), TLSCertHash: []byte("tls-cert-hash"), MaxRetryDuration: time.Hour, - MaxRetryDurationExceededHandler: func() (stopRetries bool) { return false }, + MaxRetryDurationExceededHandler: fakeDurationExceededHandler.DurationExceededHandler, MaxRetryInterval: 10 * time.Second, InitialRetryInterval: 100 * time.Millisecond, } @@ -269,17 +273,101 @@ var _ = Describe("CFT-Deliverer", func() { When("the consecutive errors are unbounded and the peer is not a static leader", func() { BeforeEach(func() { + fakeDurationExceededHandler.DurationExceededHandlerReturns(true) fakeDeliverStreamer.DeliverReturns(nil, fmt.Errorf("deliver-error")) fakeDeliverStreamer.DeliverReturnsOnCall(500, fakeDeliverClient, nil) }) It("hits the maximum sleep time value in an exponential fashion and retries until exceeding the max retry duration", func() { - d.MaxRetryDurationExceededHandler = func() (stopRetries bool) { return true } + Eventually(fakeDurationExceededHandler.DurationExceededHandlerCallCount).Should(BeNumerically(">", 0)) + Eventually(endC).Should(BeClosed()) Eventually(fakeSleeper.SleepCallCount, 5*time.Second).Should(Equal(380)) Expect(fakeSleeper.SleepArgsForCall(25)).To(Equal(9539 * time.Millisecond)) Expect(fakeSleeper.SleepArgsForCall(26)).To(Equal(10 * time.Second)) Expect(fakeSleeper.SleepArgsForCall(27)).To(Equal(10 * time.Second)) Expect(fakeSleeper.SleepArgsForCall(379)).To(Equal(10 * time.Second)) + Expect(fakeDurationExceededHandler.DurationExceededHandlerCallCount()).Should(Equal(1)) + }) + }) + + When("the consecutive errors are coming in short bursts and the peer is not a static leader", func() { + BeforeEach(func() { + // appease the race detector + doneC := doneC + recvStep := recvStep + fakeDeliverClient := fakeDeliverClient + + fakeDeliverClient.CloseSendStub = func() error { + if fakeDeliverClient.CloseSendCallCount() >= 1000 { + select { + case <-doneC: + case recvStep <- struct{}{}: + } + } + return nil + } + fakeDeliverClient.RecvStub = func() (*orderer.DeliverResponse, error) { + c := fakeDeliverClient.RecvCallCount() + switch c { + case 300: + return &orderer.DeliverResponse{ + Type: &orderer.DeliverResponse_Block{ + Block: &common.Block{ + Header: &common.BlockHeader{ + Number: 8, + }, + }, + }, + }, nil + case 600: + return &orderer.DeliverResponse{ + Type: &orderer.DeliverResponse_Block{ + Block: &common.Block{ + Header: &common.BlockHeader{ + Number: 9, + }, + }, + }, + }, nil + case 900: + return &orderer.DeliverResponse{ + Type: &orderer.DeliverResponse_Block{ + Block: &common.Block{ + Header: &common.BlockHeader{ + Number: 9, + }, + }, + }, + }, nil + default: + if c < 900 { + return nil, fmt.Errorf("fake-recv-error-XXX") + } + + select { + case <-recvStep: + return nil, fmt.Errorf("fake-recv-step-error-XXX") + case <-doneC: + return nil, nil + } + } + } + fakeDurationExceededHandler.DurationExceededHandlerReturns(true) + }) + + It("hits the maximum sleep time value in an exponential fashion and retries but does not exceed the max retry duration", func() { + Eventually(fakeSleeper.SleepCallCount, 10*time.Second).Should(Equal(897)) + Expect(fakeSleeper.SleepArgsForCall(0)).To(Equal(100 * time.Millisecond)) + Expect(fakeSleeper.SleepArgsForCall(25)).To(Equal(9539 * time.Millisecond)) + Expect(fakeSleeper.SleepArgsForCall(26)).To(Equal(10 * time.Second)) + Expect(fakeSleeper.SleepArgsForCall(27)).To(Equal(10 * time.Second)) + Expect(fakeSleeper.SleepArgsForCall(298)).To(Equal(10 * time.Second)) + Expect(fakeSleeper.SleepArgsForCall(299)).To(Equal(100 * time.Millisecond)) + Expect(fakeSleeper.SleepArgsForCall(2*299 - 1)).To(Equal(10 * time.Second)) + Expect(fakeSleeper.SleepArgsForCall(2 * 299)).To(Equal(100 * time.Millisecond)) + Expect(fakeSleeper.SleepArgsForCall(3*299 - 1)).To(Equal(10 * time.Second)) + + Expect(fakeDurationExceededHandler.DurationExceededHandlerCallCount()).Should(Equal(0)) }) }) @@ -290,12 +378,12 @@ var _ = Describe("CFT-Deliverer", func() { }) It("hits the maximum sleep time value in an exponential fashion and retries indefinitely", func() { - d.MaxRetryDurationExceededHandler = func() (stopRetries bool) { return false } Eventually(fakeSleeper.SleepCallCount, 5*time.Second).Should(Equal(500)) Expect(fakeSleeper.SleepArgsForCall(25)).To(Equal(9539 * time.Millisecond)) Expect(fakeSleeper.SleepArgsForCall(26)).To(Equal(10 * time.Second)) Expect(fakeSleeper.SleepArgsForCall(27)).To(Equal(10 * time.Second)) - Expect(fakeSleeper.SleepArgsForCall(379)).To(Equal(10 * time.Second)) + Expect(fakeSleeper.SleepArgsForCall(499)).To(Equal(10 * time.Second)) + Eventually(fakeDurationExceededHandler.DurationExceededHandlerCallCount).Should(Equal(120)) }) })