Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CFT Block Puller: reset total sleep #4402

Merged
merged 1 commit into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func prepareBlock(seq uint64, contentType orderer.SeekInfo_SeekContentType, good
for i := 0; i < numTx; i++ {
data.Data[i] = []byte{byte(i), byte(seq)}
}
block.Header.DataHash, _ = protoutil.BlockDataHash(data)
block.Header.DataHash = protoutil.ComputeBlockDataHash(data)
if contentType == orderer.SeekInfo_BLOCK {
block.Data = data
}
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/peer/blocksprovider/deliverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ type DeliverStreamer interface {
//
// In the peer, with gossip and a dynamic leader, stopping causes the gossip leader to yield.
// In the peer, with gossip and a static leader, we never stop.
// In the orderer, we never stop.
type MaxRetryDurationExceededHandler func() (stopRetries bool)

const backoffExponentBase = 1.2
Expand Down Expand Up @@ -150,7 +149,7 @@ func (d *Deliverer) DeliverBlocks() {
totalDuration += sleepDuration
if totalDuration > d.MaxRetryDuration {
if d.MaxRetryDurationExceededHandler() {
d.Logger.Warningf("attempted to retry block delivery for more than peer.deliveryclient.reconnectTotalTimeThreshold duration %v, giving up", d.MaxRetryDuration)
d.Logger.Warningf("attempted to retry block delivery for more than peer.deliveryclient.reconnectTotalTimeThreshold duration (%s), giving up", d.MaxRetryDuration)
return
}
d.Logger.Warningf("peer is a static leader, ignoring peer.deliveryclient.reconnectTotalTimeThreshold")
Expand Down Expand Up @@ -203,6 +202,7 @@ func (d *Deliverer) DeliverBlocks() {
blockReceiver.Start() // starts an internal goroutine
onSuccess := func(blockNum uint64) {
failureCounter = 0
totalDuration = time.Duration(0)
}
if err := blockReceiver.ProcessIncoming(onSuccess); err != nil {
switch err.(type) {
Expand All @@ -211,6 +211,7 @@ func (d *Deliverer) DeliverBlocks() {
case *ErrStopping:
// Don't count it as an error, it is a signal to stop.
default:
d.Logger.Warningf("Failure in processing incoming messages: %s", err)
failureCounter++
}
}
Expand Down
96 changes: 92 additions & 4 deletions internal/pkg/peer/blocksprovider/deliverer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var _ = Describe("CFT-Deliverer", func() {
fakeDeliverStreamer *fake.DeliverStreamer
fakeDeliverClient *fake.DeliverClient
fakeSleeper *fake.Sleeper
fakeDurationExceededHandler *fake.DurationExceededHandler
doneC chan struct{}
recvStep chan struct{}
endC chan struct{}
Expand Down Expand Up @@ -98,6 +99,9 @@ var _ = Describe("CFT-Deliverer", func() {
fakeDeliverStreamer = &fake.DeliverStreamer{}
fakeDeliverStreamer.DeliverReturns(fakeDeliverClient, nil)

fakeDurationExceededHandler = &fake.DurationExceededHandler{}
fakeDurationExceededHandler.DurationExceededHandlerReturns(false)

d = &blocksprovider.Deliverer{
ChannelID: "channel-id",
BlockHandler: fakeBlockHandler,
Expand All @@ -111,7 +115,7 @@ var _ = Describe("CFT-Deliverer", func() {
Logger: flogging.MustGetLogger("blocksprovider"),
TLSCertHash: []byte("tls-cert-hash"),
MaxRetryDuration: time.Hour,
MaxRetryDurationExceededHandler: func() (stopRetries bool) { return false },
MaxRetryDurationExceededHandler: fakeDurationExceededHandler.DurationExceededHandler,
MaxRetryInterval: 10 * time.Second,
InitialRetryInterval: 100 * time.Millisecond,
}
Expand Down Expand Up @@ -269,17 +273,101 @@ var _ = Describe("CFT-Deliverer", func() {

When("the consecutive errors are unbounded and the peer is not a static leader", func() {
BeforeEach(func() {
fakeDurationExceededHandler.DurationExceededHandlerReturns(true)
fakeDeliverStreamer.DeliverReturns(nil, fmt.Errorf("deliver-error"))
fakeDeliverStreamer.DeliverReturnsOnCall(500, fakeDeliverClient, nil)
})

It("hits the maximum sleep time value in an exponential fashion and retries until exceeding the max retry duration", func() {
d.MaxRetryDurationExceededHandler = func() (stopRetries bool) { return true }
Eventually(fakeDurationExceededHandler.DurationExceededHandlerCallCount).Should(BeNumerically(">", 0))
Eventually(endC).Should(BeClosed())
Eventually(fakeSleeper.SleepCallCount, 5*time.Second).Should(Equal(380))
Expect(fakeSleeper.SleepArgsForCall(25)).To(Equal(9539 * time.Millisecond))
Expect(fakeSleeper.SleepArgsForCall(26)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(27)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(379)).To(Equal(10 * time.Second))
Expect(fakeDurationExceededHandler.DurationExceededHandlerCallCount()).Should(Equal(1))
})
})

When("the consecutive errors are coming in short bursts and the peer is not a static leader", func() {
BeforeEach(func() {
// appease the race detector
doneC := doneC
recvStep := recvStep
fakeDeliverClient := fakeDeliverClient

fakeDeliverClient.CloseSendStub = func() error {
if fakeDeliverClient.CloseSendCallCount() >= 1000 {
select {
case <-doneC:
case recvStep <- struct{}{}:
}
}
return nil
}
fakeDeliverClient.RecvStub = func() (*orderer.DeliverResponse, error) {
c := fakeDeliverClient.RecvCallCount()
switch c {
case 300:
return &orderer.DeliverResponse{
Type: &orderer.DeliverResponse_Block{
Block: &common.Block{
Header: &common.BlockHeader{
Number: 8,
},
},
},
}, nil
case 600:
return &orderer.DeliverResponse{
Type: &orderer.DeliverResponse_Block{
Block: &common.Block{
Header: &common.BlockHeader{
Number: 9,
},
},
},
}, nil
case 900:
return &orderer.DeliverResponse{
Type: &orderer.DeliverResponse_Block{
Block: &common.Block{
Header: &common.BlockHeader{
Number: 9,
},
},
},
}, nil
default:
if c < 900 {
return nil, fmt.Errorf("fake-recv-error-XXX")
}

select {
case <-recvStep:
return nil, fmt.Errorf("fake-recv-step-error-XXX")
case <-doneC:
return nil, nil
}
}
}
fakeDurationExceededHandler.DurationExceededHandlerReturns(true)
})

It("hits the maximum sleep time value in an exponential fashion and retries but does not exceed the max retry duration", func() {
Eventually(fakeSleeper.SleepCallCount, 10*time.Second).Should(Equal(897))
Expect(fakeSleeper.SleepArgsForCall(0)).To(Equal(100 * time.Millisecond))
Expect(fakeSleeper.SleepArgsForCall(25)).To(Equal(9539 * time.Millisecond))
Expect(fakeSleeper.SleepArgsForCall(26)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(27)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(298)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(299)).To(Equal(100 * time.Millisecond))
Expect(fakeSleeper.SleepArgsForCall(2*299 - 1)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(2 * 299)).To(Equal(100 * time.Millisecond))
Expect(fakeSleeper.SleepArgsForCall(3*299 - 1)).To(Equal(10 * time.Second))

Expect(fakeDurationExceededHandler.DurationExceededHandlerCallCount()).Should(Equal(0))
})
})

Expand All @@ -290,12 +378,12 @@ var _ = Describe("CFT-Deliverer", func() {
})

It("hits the maximum sleep time value in an exponential fashion and retries indefinitely", func() {
d.MaxRetryDurationExceededHandler = func() (stopRetries bool) { return false }
Eventually(fakeSleeper.SleepCallCount, 5*time.Second).Should(Equal(500))
Expect(fakeSleeper.SleepArgsForCall(25)).To(Equal(9539 * time.Millisecond))
Expect(fakeSleeper.SleepArgsForCall(26)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(27)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(379)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(499)).To(Equal(10 * time.Second))
Eventually(fakeDurationExceededHandler.DurationExceededHandlerCallCount).Should(Equal(120))
})
})

Expand Down