Skip to content

Commit

Permalink
[FABG-761] Pluggable peer resolvers for event client
Browse files Browse the repository at this point in the history
The event client now allows you to specify which peer
resolver to use when selecting peers to connect to. To
peer resolver also decides when to disconnect to from
the connected peer.

Also added a vanilla peer resolver that simply load
balances between peers when choosing the initial peer
to connect to.

Change-Id: Ia0ef4db4faa2c2ea9c2535ac8ccaee7cfbe6c238
Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Sep 12, 2018
1 parent c8fd21d commit c3753ac
Show file tree
Hide file tree
Showing 20 changed files with 702 additions and 292 deletions.
7 changes: 4 additions & 3 deletions pkg/common/providers/fab/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,10 @@ type EventServiceConfig interface {
// affecting performance.
ReconnectBlockHeightLagThreshold() int

// BlockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. Note that this
// value is only relevant if reconnectBlockHeightLagThreshold >0.
BlockHeightMonitorPeriod() time.Duration
// PeerMonitorPeriod is the period in which the connected peer is monitored to see if
// the event client should disconnect from it and reconnect to another peer.
// If set to 0 then the peer will not be monitored and will not be disconnected.
PeerMonitorPeriod() time.Duration
}

// TimeoutType enumerates the different types of outgoing connections
Expand Down
12 changes: 6 additions & 6 deletions pkg/core/config/testdata/template/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,17 @@ client:
# # reconnectBlockHeightLagThreshold - if >0 then the event client will disconnect from the peer if the peer's
# # block height falls behind the specified number of blocks and will reconnect to a better performing peer.
# # If set to 0 then this feature is disabled.
# # Default: 0 (disabled)
# # Default: 0
# # NOTES:
# # - This feature should only be enabled when using deliver events, otherwise events may be lost
# # - peerMonitorPeriod must be >0 to enable this feature
# # - Setting this value too low may cause the event client to disconnect/reconnect too frequently, thereby
# # affecting performance.
# reconnectBlockHeightLagThreshold: 0
#
# # blockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. Note that this
# # value is only relevant if reconnectBlockHeightLagThreshold >0.
# # Default: 5s
# blockHeightMonitorPeriod: 5s
# # peerMonitorPeriod is the period in which the connected peer is monitored to see if
# # the event client should disconnect from it and reconnect to another peer.
# # Default: 0 (disabled)
# peerMonitorPeriod: 5s

# the below timeouts are commented out to use the default values that are found in
# "pkg/fab/endpointconfig.go"
Expand Down
16 changes: 6 additions & 10 deletions pkg/fab/endpointconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ const (
defaultSelectionRefreshInterval = time.Second * 5
defaultCacheSweepInterval = time.Second * 15

defaultBlockHeightLagThreshold = 5
defaultBlockHeightMonitorPeriod = 5 * time.Second
defaultBlockHeightLagThreshold = 5

//default grpc opts
defaultKeepAliveTime = 0
Expand Down Expand Up @@ -1684,14 +1683,11 @@ func (c *EventServiceConfig) ReconnectBlockHeightLagThreshold() int {
return c.backend.GetInt("client.eventService.reconnectBlockHeightLagThreshold")
}

// BlockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. Note that this
// value is only relevant if reconnectBlockHeightLagThreshold >0.
func (c *EventServiceConfig) BlockHeightMonitorPeriod() time.Duration {
period := c.backend.GetDuration("client.eventService.blockHeightMonitorPeriod")
if period == 0 {
return defaultBlockHeightMonitorPeriod
}
return period
// PeerMonitorPeriod is the period in which the connected peer is monitored to see if
// the event client should disconnect from it and reconnect to another peer.
// A value of 0 (default) means that the peer will not be monitored.
func (c *EventServiceConfig) PeerMonitorPeriod() time.Duration {
return c.backend.GetDuration("client.eventService.peerMonitorPeriod")
}

//peerChannelConfigHookFunc returns hook function for unmarshalling 'fab.PeerChannelConfig'
Expand Down
4 changes: 2 additions & 2 deletions pkg/fab/endpointconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,15 @@ func TestEventServiceConfig(t *testing.T) {
customBackend.KeyValueMap["client.eventService.type"] = "deliver"
customBackend.KeyValueMap["client.eventService.blockHeightLagThreshold"] = "4"
customBackend.KeyValueMap["client.eventService.reconnectBlockHeightLagThreshold"] = "7"
customBackend.KeyValueMap["client.eventService.blockHeightMonitorPeriod"] = "7s"
customBackend.KeyValueMap["client.eventService.peerMonitorPeriod"] = "7s"

endpointConfig, err := ConfigFromBackend(customBackend)
require.NoError(t, err)

eventServiceConfig := endpointConfig.EventServiceConfig()
assert.Equalf(t, 4, eventServiceConfig.BlockHeightLagThreshold(), "invalid value for blockHeightLagThreshold")
assert.Equalf(t, 7, eventServiceConfig.ReconnectBlockHeightLagThreshold(), "invalid value for reconnectBlockHeightLagThreshold")
assert.Equalf(t, 7*time.Second, eventServiceConfig.BlockHeightMonitorPeriod(), "invalid value for blockHeightMonitorPeriod")
assert.Equalf(t, 7*time.Second, eventServiceConfig.PeerMonitorPeriod(), "invalid value for peerMonitorPeriod")
}

func checkTimeouts(endpointConfig fab.EndpointConfig, t *testing.T, errStr string) {
Expand Down
17 changes: 10 additions & 7 deletions pkg/fab/events/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,26 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/lbp"

"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/util/test"
"github.com/stretchr/testify/assert"

"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/api"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher"
clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/mocks"
mockconn "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/mocks"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/peerresolver/minblockheight"
esdispatcher "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher"
servicemocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks"
fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp"
"github.com/hyperledger/fabric-sdk-go/pkg/util/test"
cb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const (
Expand Down Expand Up @@ -1424,9 +1425,11 @@ func TestDisconnectIfBlockHeightLags(t *testing.T) {
WithTimeBetweenConnectAttempts(time.Millisecond),
WithConnectionEvent(connectch),
WithResponseTimeout(2 * time.Second),
dispatcher.WithBlockHeightLagThreshold(2),
dispatcher.WithReconnectBlockHeightThreshold(3),
dispatcher.WithBlockHeightMonitorPeriod(250 * time.Millisecond),
dispatcher.WithPeerResolver(minblockheight.NewResolver()),
dispatcher.WithLoadBalancePolicy(lbp.NewRoundRobin()),
dispatcher.WithPeerMonitorPeriod(250 * time.Millisecond),
minblockheight.WithBlockHeightLagThreshold(2),
minblockheight.WithReconnectBlockHeightThreshold(3),
},
)
if err != nil {
Expand Down
Loading

0 comments on commit c3753ac

Please sign in to comment.