Skip to content

Commit

Permalink
Change options of the pool after reconfig (#571)
Browse files Browse the repository at this point in the history
Signed-off-by: Fedor Partanskiy <pfi79@mail.ru>
Co-authored-by: C0rWin <artem@bargr.net>
  • Loading branch information
pfi79 and C0rWin authored Feb 15, 2024
1 parent 57f434c commit fd3c35e
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 3 deletions.
12 changes: 10 additions & 2 deletions internal/bft/requestpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func NewPool(log api.Logger, inspector api.RequestInspector, th RequestTimeoutHa
return rp
}

// ChangeTimeouts changes the timeout of the pool
func (rp *Pool) ChangeTimeouts(th RequestTimeoutHandler, options PoolOptions) {
// ChangeOptions changes the options of the pool
func (rp *Pool) ChangeOptions(th RequestTimeoutHandler, options PoolOptions) {
rp.lock.Lock()
defer rp.lock.Unlock()

Expand All @@ -162,10 +162,18 @@ func (rp *Pool) ChangeTimeouts(th RequestTimeoutHandler, options PoolOptions) {
if options.AutoRemoveTimeout == 0 {
options.AutoRemoveTimeout = defaultRequestTimeout
}
if options.RequestMaxBytes == 0 {
options.RequestMaxBytes = defaultMaxBytes
}
if options.SubmitTimeout == 0 {
options.SubmitTimeout = defaultRequestTimeout
}

rp.options.ForwardTimeout = options.ForwardTimeout
rp.options.ComplainTimeout = options.ComplainTimeout
rp.options.AutoRemoveTimeout = options.AutoRemoveTimeout
rp.options.RequestMaxBytes = options.RequestMaxBytes
rp.options.SubmitTimeout = options.SubmitTimeout

rp.timeoutHandler = th

Expand Down
40 changes: 40 additions & 0 deletions internal/bft/requestpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,46 @@ func TestReqPoolTimeout(t *testing.T) {
err = pool.Submit(request)
assert.Contains(t, err.Error(), "is bigger than request max bytes")
})
t.Run("change request size", func(t *testing.T) {
timeoutHandler := &mocks.RequestTimeoutHandler{}

timeoutHandler.On("OnRequestTimeout", byteReq1, insp.RequestID(byteReq1)).Return()
timeoutHandler.On("OnLeaderFwdRequestTimeout", byteReq1, insp.RequestID(byteReq1)).Return()
timeoutHandler.On("OnAutoRemoveTimeout", insp.RequestID(byteReq1)).Return()

pool := bft.NewPool(log, insp, timeoutHandler,
bft.PoolOptions{
QueueSize: 3,
ForwardTimeout: 10 * time.Millisecond,
ComplainTimeout: time.Hour,
AutoRemoveTimeout: time.Hour,
RequestMaxBytes: 1024,
},
nil,
)
defer pool.Close()

payload := make([]byte, 2048)
rand.Read(payload)
request := makeTestRequest("1", "1", string(payload))
assert.Equal(t, 0, pool.Size())
err = pool.Submit(request)
assert.Contains(t, err.Error(), "is bigger than request max bytes")

pool.StopTimers()
opts := bft.PoolOptions{
ForwardTimeout: 10 * time.Millisecond,
ComplainTimeout: time.Hour,
AutoRemoveTimeout: time.Hour,
RequestMaxBytes: 1024 * 3,
}
pool.ChangeOptions(timeoutHandler, opts)
pool.RestartTimers()

err = pool.Submit(request)
assert.NoError(t, err)
assert.Equal(t, 1, pool.Size())
})
t.Run("request timeout", func(t *testing.T) {
timeoutHandler := &mocks.RequestTimeoutHandler{}

Expand Down
2 changes: 1 addition & 1 deletion pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (c *Consensus) reconfig(reconfig types.Reconfig) {
RequestMaxBytes: c.Config.RequestMaxBytes,
SubmitTimeout: c.Config.RequestPoolSubmitTimeout,
}
c.Pool.ChangeTimeouts(c.controller, opts) // TODO handle reconfiguration of queue size in the pool
c.Pool.ChangeOptions(c.controller, opts) // TODO handle reconfiguration of queue size in the pool
c.continueCreateComponents()

proposal, _ := c.checkpoint.Get()
Expand Down

0 comments on commit fd3c35e

Please sign in to comment.