Skip to content

Commit

Permalink
CFT Block Puller: Reset total sleep time
Browse files Browse the repository at this point in the history
When a block arrives, reset the total sleep time.

Signed-off-by: Yoav Tock <tock@il.ibm.com>
Change-Id: I340cd3f6026f68e31af72410f5f42790d8d429d9
  • Loading branch information
tock-ibm committed Oct 24, 2023
1 parent d36d5fb commit 66333d9
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 6 deletions.
5 changes: 3 additions & 2 deletions internal/pkg/peer/blocksprovider/deliverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ type DeliverStreamer interface {
//
// In the peer, with gossip and a dynamic leader, stopping causes the gossip leader to yield.
// In the peer, with gossip and a static leader, we never stop.
// In the orderer, we never stop.
type MaxRetryDurationExceededHandler func() (stopRetries bool)

const backoffExponentBase = 1.2
Expand Down Expand Up @@ -150,7 +149,7 @@ func (d *Deliverer) DeliverBlocks() {
totalDuration += sleepDuration
if totalDuration > d.MaxRetryDuration {
if d.MaxRetryDurationExceededHandler() {
d.Logger.Warningf("attempted to retry block delivery for more than peer.deliveryclient.reconnectTotalTimeThreshold duration %v, giving up", d.MaxRetryDuration)
d.Logger.Warningf("attempted to retry block delivery for more than peer.deliveryclient.reconnectTotalTimeThreshold duration (%s), giving up", d.MaxRetryDuration)
return
}
d.Logger.Warningf("peer is a static leader, ignoring peer.deliveryclient.reconnectTotalTimeThreshold")
Expand Down Expand Up @@ -203,6 +202,7 @@ func (d *Deliverer) DeliverBlocks() {
blockReceiver.Start() // starts an internal goroutine
onSuccess := func(blockNum uint64) {
failureCounter = 0
totalDuration = time.Duration(0)
}
if err := blockReceiver.ProcessIncoming(onSuccess); err != nil {
switch err.(type) {
Expand All @@ -211,6 +211,7 @@ func (d *Deliverer) DeliverBlocks() {
case *ErrStopping:
// Don't count it as an error, it is a signal to stop.
default:
d.Logger.Warningf("Failure in processing incoming messages: %s", err)
failureCounter++
}
}
Expand Down
96 changes: 92 additions & 4 deletions internal/pkg/peer/blocksprovider/deliverer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var _ = Describe("CFT-Deliverer", func() {
fakeDeliverStreamer *fake.DeliverStreamer
fakeDeliverClient *fake.DeliverClient
fakeSleeper *fake.Sleeper
fakeDurationExceededHandler *fake.DurationExceededHandler
doneC chan struct{}
recvStep chan struct{}
endC chan struct{}
Expand Down Expand Up @@ -98,6 +99,9 @@ var _ = Describe("CFT-Deliverer", func() {
fakeDeliverStreamer = &fake.DeliverStreamer{}
fakeDeliverStreamer.DeliverReturns(fakeDeliverClient, nil)

fakeDurationExceededHandler = &fake.DurationExceededHandler{}
fakeDurationExceededHandler.DurationExceededHandlerReturns(false)

d = &blocksprovider.Deliverer{
ChannelID: "channel-id",
BlockHandler: fakeBlockHandler,
Expand All @@ -111,7 +115,7 @@ var _ = Describe("CFT-Deliverer", func() {
Logger: flogging.MustGetLogger("blocksprovider"),
TLSCertHash: []byte("tls-cert-hash"),
MaxRetryDuration: time.Hour,
MaxRetryDurationExceededHandler: func() (stopRetries bool) { return false },
MaxRetryDurationExceededHandler: fakeDurationExceededHandler.DurationExceededHandler,
MaxRetryInterval: 10 * time.Second,
InitialRetryInterval: 100 * time.Millisecond,
}
Expand Down Expand Up @@ -269,17 +273,101 @@ var _ = Describe("CFT-Deliverer", func() {

When("the consecutive errors are unbounded and the peer is not a static leader", func() {
BeforeEach(func() {
fakeDurationExceededHandler.DurationExceededHandlerReturns(true)
fakeDeliverStreamer.DeliverReturns(nil, fmt.Errorf("deliver-error"))
fakeDeliverStreamer.DeliverReturnsOnCall(500, fakeDeliverClient, nil)
})

It("hits the maximum sleep time value in an exponential fashion and retries until exceeding the max retry duration", func() {
d.MaxRetryDurationExceededHandler = func() (stopRetries bool) { return true }
Eventually(fakeDurationExceededHandler.DurationExceededHandlerCallCount).Should(BeNumerically(">", 0))
Eventually(endC).Should(BeClosed())
Eventually(fakeSleeper.SleepCallCount, 5*time.Second).Should(Equal(380))
Expect(fakeSleeper.SleepArgsForCall(25)).To(Equal(9539 * time.Millisecond))
Expect(fakeSleeper.SleepArgsForCall(26)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(27)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(379)).To(Equal(10 * time.Second))
Expect(fakeDurationExceededHandler.DurationExceededHandlerCallCount()).Should(Equal(1))
})
})

When("the consecutive errors are coming in short bursts and the peer is not a static leader", func() {
BeforeEach(func() {
// appease the race detector
doneC := doneC
recvStep := recvStep
fakeDeliverClient := fakeDeliverClient

fakeDeliverClient.CloseSendStub = func() error {
if fakeDeliverClient.CloseSendCallCount() >= 1000 {
select {
case <-doneC:
case recvStep <- struct{}{}:
}
}
return nil
}
fakeDeliverClient.RecvStub = func() (*orderer.DeliverResponse, error) {
c := fakeDeliverClient.RecvCallCount()
switch c {
case 300:
return &orderer.DeliverResponse{
Type: &orderer.DeliverResponse_Block{
Block: &common.Block{
Header: &common.BlockHeader{
Number: 8,
},
},
},
}, nil
case 600:
return &orderer.DeliverResponse{
Type: &orderer.DeliverResponse_Block{
Block: &common.Block{
Header: &common.BlockHeader{
Number: 9,
},
},
},
}, nil
case 900:
return &orderer.DeliverResponse{
Type: &orderer.DeliverResponse_Block{
Block: &common.Block{
Header: &common.BlockHeader{
Number: 9,
},
},
},
}, nil
default:
if c < 900 {
return nil, fmt.Errorf("fake-recv-error-XXX")
}

select {
case <-recvStep:
return nil, fmt.Errorf("fake-recv-step-error-XXX")
case <-doneC:
return nil, nil
}
}
}
fakeDurationExceededHandler.DurationExceededHandlerReturns(true)
})

It("hits the maximum sleep time value in an exponential fashion and retries but does not exceed the max retry duration", func() {
Eventually(fakeSleeper.SleepCallCount, 10*time.Second).Should(Equal(897))
Expect(fakeSleeper.SleepArgsForCall(0)).To(Equal(100 * time.Millisecond))
Expect(fakeSleeper.SleepArgsForCall(25)).To(Equal(9539 * time.Millisecond))
Expect(fakeSleeper.SleepArgsForCall(26)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(27)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(298)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(299)).To(Equal(100 * time.Millisecond))
Expect(fakeSleeper.SleepArgsForCall(2*299 - 1)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(2 * 299)).To(Equal(100 * time.Millisecond))
Expect(fakeSleeper.SleepArgsForCall(3*299 - 1)).To(Equal(10 * time.Second))

Expect(fakeDurationExceededHandler.DurationExceededHandlerCallCount()).Should(Equal(0))
})
})

Expand All @@ -290,12 +378,12 @@ var _ = Describe("CFT-Deliverer", func() {
})

It("hits the maximum sleep time value in an exponential fashion and retries indefinitely", func() {
d.MaxRetryDurationExceededHandler = func() (stopRetries bool) { return false }
Eventually(fakeSleeper.SleepCallCount, 5*time.Second).Should(Equal(500))
Expect(fakeSleeper.SleepArgsForCall(25)).To(Equal(9539 * time.Millisecond))
Expect(fakeSleeper.SleepArgsForCall(26)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(27)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(379)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(499)).To(Equal(10 * time.Second))
Eventually(fakeDurationExceededHandler.DurationExceededHandlerCallCount).Should(Equal(120))
})
})

Expand Down

0 comments on commit 66333d9

Please sign in to comment.