From c3753acd298e4f1407db928e3265959738753c94 Mon Sep 17 00:00:00 2001 From: Bob Stasyszyn Date: Wed, 12 Sep 2018 13:37:57 -0400 Subject: [PATCH] [FABG-761] Pluggable peer resolvers for event client The event client now allows you to specify which peer resolver to use when selecting peers to connect to. To peer resolver also decides when to disconnect to from the connected peer. Also added a vanilla peer resolver that simply load balances between peers when choosing the initial peer to connect to. Change-Id: Ia0ef4db4faa2c2ea9c2535ac8ccaee7cfbe6c238 Signed-off-by: Bob Stasyszyn --- pkg/common/providers/fab/provider.go | 7 +- pkg/core/config/testdata/template/config.yaml | 12 +- pkg/fab/endpointconfig.go | 16 +- pkg/fab/endpointconfig_test.go | 4 +- pkg/fab/events/client/client_test.go | 17 +- .../events/client/dispatcher/dispatcher.go | 197 ++++-------------- .../client/dispatcher/dispatcher_test.go | 31 +-- pkg/fab/events/client/dispatcher/opts.go | 100 +++------ pkg/fab/events/client/mocks/mockdispatcher.go | 28 +++ .../client/peerresolver/balanced/balanced.go | 52 +++++ .../peerresolver/balanced/balanced_test.go | 52 +++++ .../client/peerresolver/balanced/opts.go | 27 +++ .../minblockheight/minblockheight.go | 187 +++++++++++++++++ .../minblockheight/minblockheight_test.go | 121 +++++++++++ .../peerresolver/minblockheight/opts.go | 91 ++++++++ .../client/peerresolver/peerresolver.go | 25 +++ pkg/fab/mocks/mockconfig.go | 10 +- pkg/fab/opts_test.go | 2 +- test/fixtures/config/config_test.yaml | 13 +- .../endpointconfig_override_test.go | 2 +- 20 files changed, 702 insertions(+), 292 deletions(-) create mode 100644 pkg/fab/events/client/mocks/mockdispatcher.go create mode 100644 pkg/fab/events/client/peerresolver/balanced/balanced.go create mode 100644 pkg/fab/events/client/peerresolver/balanced/balanced_test.go create mode 100644 pkg/fab/events/client/peerresolver/balanced/opts.go create mode 100644 pkg/fab/events/client/peerresolver/minblockheight/minblockheight.go create mode 100644 pkg/fab/events/client/peerresolver/minblockheight/minblockheight_test.go create mode 100644 pkg/fab/events/client/peerresolver/minblockheight/opts.go create mode 100644 pkg/fab/events/client/peerresolver/peerresolver.go diff --git a/pkg/common/providers/fab/provider.go b/pkg/common/providers/fab/provider.go index c72a652298..d1b3af2cc1 100644 --- a/pkg/common/providers/fab/provider.go +++ b/pkg/common/providers/fab/provider.go @@ -124,9 +124,10 @@ type EventServiceConfig interface { // affecting performance. ReconnectBlockHeightLagThreshold() int - // BlockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. Note that this - // value is only relevant if reconnectBlockHeightLagThreshold >0. - BlockHeightMonitorPeriod() time.Duration + // PeerMonitorPeriod is the period in which the connected peer is monitored to see if + // the event client should disconnect from it and reconnect to another peer. + // If set to 0 then the peer will not be monitored and will not be disconnected. + PeerMonitorPeriod() time.Duration } // TimeoutType enumerates the different types of outgoing connections diff --git a/pkg/core/config/testdata/template/config.yaml b/pkg/core/config/testdata/template/config.yaml index ed1af97395..173af2e66a 100755 --- a/pkg/core/config/testdata/template/config.yaml +++ b/pkg/core/config/testdata/template/config.yaml @@ -58,17 +58,17 @@ client: # # reconnectBlockHeightLagThreshold - if >0 then the event client will disconnect from the peer if the peer's # # block height falls behind the specified number of blocks and will reconnect to a better performing peer. # # If set to 0 then this feature is disabled. -# # Default: 0 (disabled) +# # Default: 0 # # NOTES: -# # - This feature should only be enabled when using deliver events, otherwise events may be lost +# # - peerMonitorPeriod must be >0 to enable this feature # # - Setting this value too low may cause the event client to disconnect/reconnect too frequently, thereby # # affecting performance. # reconnectBlockHeightLagThreshold: 0 # -# # blockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. Note that this -# # value is only relevant if reconnectBlockHeightLagThreshold >0. -# # Default: 5s -# blockHeightMonitorPeriod: 5s +# # peerMonitorPeriod is the period in which the connected peer is monitored to see if +# # the event client should disconnect from it and reconnect to another peer. +# # Default: 0 (disabled) +# peerMonitorPeriod: 5s # the below timeouts are commented out to use the default values that are found in # "pkg/fab/endpointconfig.go" diff --git a/pkg/fab/endpointconfig.go b/pkg/fab/endpointconfig.go index 7aa649ece0..7be3bde575 100644 --- a/pkg/fab/endpointconfig.go +++ b/pkg/fab/endpointconfig.go @@ -53,8 +53,7 @@ const ( defaultSelectionRefreshInterval = time.Second * 5 defaultCacheSweepInterval = time.Second * 15 - defaultBlockHeightLagThreshold = 5 - defaultBlockHeightMonitorPeriod = 5 * time.Second + defaultBlockHeightLagThreshold = 5 //default grpc opts defaultKeepAliveTime = 0 @@ -1684,14 +1683,11 @@ func (c *EventServiceConfig) ReconnectBlockHeightLagThreshold() int { return c.backend.GetInt("client.eventService.reconnectBlockHeightLagThreshold") } -// BlockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. Note that this -// value is only relevant if reconnectBlockHeightLagThreshold >0. -func (c *EventServiceConfig) BlockHeightMonitorPeriod() time.Duration { - period := c.backend.GetDuration("client.eventService.blockHeightMonitorPeriod") - if period == 0 { - return defaultBlockHeightMonitorPeriod - } - return period +// PeerMonitorPeriod is the period in which the connected peer is monitored to see if +// the event client should disconnect from it and reconnect to another peer. +// A value of 0 (default) means that the peer will not be monitored. +func (c *EventServiceConfig) PeerMonitorPeriod() time.Duration { + return c.backend.GetDuration("client.eventService.peerMonitorPeriod") } //peerChannelConfigHookFunc returns hook function for unmarshalling 'fab.PeerChannelConfig' diff --git a/pkg/fab/endpointconfig_test.go b/pkg/fab/endpointconfig_test.go index 84ac1524f6..e2044c7427 100644 --- a/pkg/fab/endpointconfig_test.go +++ b/pkg/fab/endpointconfig_test.go @@ -197,7 +197,7 @@ func TestEventServiceConfig(t *testing.T) { customBackend.KeyValueMap["client.eventService.type"] = "deliver" customBackend.KeyValueMap["client.eventService.blockHeightLagThreshold"] = "4" customBackend.KeyValueMap["client.eventService.reconnectBlockHeightLagThreshold"] = "7" - customBackend.KeyValueMap["client.eventService.blockHeightMonitorPeriod"] = "7s" + customBackend.KeyValueMap["client.eventService.peerMonitorPeriod"] = "7s" endpointConfig, err := ConfigFromBackend(customBackend) require.NoError(t, err) @@ -205,7 +205,7 @@ func TestEventServiceConfig(t *testing.T) { eventServiceConfig := endpointConfig.EventServiceConfig() assert.Equalf(t, 4, eventServiceConfig.BlockHeightLagThreshold(), "invalid value for blockHeightLagThreshold") assert.Equalf(t, 7, eventServiceConfig.ReconnectBlockHeightLagThreshold(), "invalid value for reconnectBlockHeightLagThreshold") - assert.Equalf(t, 7*time.Second, eventServiceConfig.BlockHeightMonitorPeriod(), "invalid value for blockHeightMonitorPeriod") + assert.Equalf(t, 7*time.Second, eventServiceConfig.PeerMonitorPeriod(), "invalid value for peerMonitorPeriod") } func checkTimeouts(endpointConfig fab.EndpointConfig, t *testing.T, errStr string) { diff --git a/pkg/fab/events/client/client_test.go b/pkg/fab/events/client/client_test.go index e6094e7c64..bccc82f607 100755 --- a/pkg/fab/events/client/client_test.go +++ b/pkg/fab/events/client/client_test.go @@ -15,25 +15,26 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/lbp" "github.com/hyperledger/fabric-sdk-go/pkg/common/options" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" - "github.com/hyperledger/fabric-sdk-go/pkg/util/test" - "github.com/stretchr/testify/assert" - "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/api" "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher" clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/mocks" mockconn "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/mocks" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/peerresolver/minblockheight" esdispatcher "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher" servicemocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks" fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp" + "github.com/hyperledger/fabric-sdk-go/pkg/util/test" cb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common" pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( @@ -1424,9 +1425,11 @@ func TestDisconnectIfBlockHeightLags(t *testing.T) { WithTimeBetweenConnectAttempts(time.Millisecond), WithConnectionEvent(connectch), WithResponseTimeout(2 * time.Second), - dispatcher.WithBlockHeightLagThreshold(2), - dispatcher.WithReconnectBlockHeightThreshold(3), - dispatcher.WithBlockHeightMonitorPeriod(250 * time.Millisecond), + dispatcher.WithPeerResolver(minblockheight.NewResolver()), + dispatcher.WithLoadBalancePolicy(lbp.NewRoundRobin()), + dispatcher.WithPeerMonitorPeriod(250 * time.Millisecond), + minblockheight.WithBlockHeightLagThreshold(2), + minblockheight.WithReconnectBlockHeightThreshold(3), }, ) if err != nil { diff --git a/pkg/fab/events/client/dispatcher/dispatcher.go b/pkg/fab/events/client/dispatcher/dispatcher.go index cca41c74a8..7ebf1b08d4 100755 --- a/pkg/fab/events/client/dispatcher/dispatcher.go +++ b/pkg/fab/events/client/dispatcher/dispatcher.go @@ -8,16 +8,15 @@ package dispatcher import ( "fmt" - "math" "sync" "time" - "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" - "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" - "github.com/hyperledger/fabric-sdk-go/pkg/common/logging" "github.com/hyperledger/fabric-sdk-go/pkg/common/options" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/api" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/peerresolver" esdispatcher "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher" "github.com/pkg/errors" ) @@ -37,17 +36,18 @@ type Dispatcher struct { connectionRegistration *ConnectionReg connectionProvider api.ConnectionProvider discoveryService fab.DiscoveryService - monitorBlockHeightDone chan struct{} + peerResolver peerresolver.Resolver + peerMonitorDone chan struct{} peer fab.Peer lock sync.RWMutex } // New creates a new dispatcher func New(context context.Client, chConfig fab.ChannelCfg, discoveryService fab.DiscoveryService, connectionProvider api.ConnectionProvider, opts ...options.Opt) *Dispatcher { - params := defaultParams(context.EndpointConfig().EventServiceConfig()) + params := defaultParams(context) options.Apply(params, opts) - return &Dispatcher{ + dispatcher := &Dispatcher{ Dispatcher: *esdispatcher.New(opts...), params: *params, context: context, @@ -55,6 +55,9 @@ func New(context context.Client, chConfig fab.ChannelCfg, discoveryService fab.D discoveryService: discoveryService, connectionProvider: connectionProvider, } + dispatcher.peerResolver = params.peerResolverProvider(dispatcher, context, opts...) + + return dispatcher } // Start starts the dispatcher @@ -83,9 +86,9 @@ func (ed *Dispatcher) HandleStopEvent(e esdispatcher.Event) { // Remove all registrations and close the associated event channels // so that the client is notified that the registration has been removed ed.clearConnectionRegistration() - if ed.monitorBlockHeightDone != nil { - close(ed.monitorBlockHeightDone) - ed.monitorBlockHeightDone = nil + if ed.peerMonitorDone != nil { + close(ed.peerMonitorDone) + ed.peerMonitorDone = nil } ed.Dispatcher.HandleStopEvent(e) @@ -118,7 +121,7 @@ func (ed *Dispatcher) HandleConnectEvent(e esdispatcher.Event) { return } - peer, err := ed.loadBalancePolicy.Choose(ed.filterByBlockHeght(peers)) + peer, err := ed.peerResolver.Resolve(peers) if err != nil { evt.ErrCh <- err return @@ -184,9 +187,9 @@ func (ed *Dispatcher) HandleConnectedEvent(e esdispatcher.Event) { } } - if ed.reconnectBlockHeightLagThreshold > 0 { - ed.monitorBlockHeightDone = make(chan struct{}) - go ed.monitorBlockHeight(ed.monitorBlockHeightDone) + if ed.peerMonitorPeriod > 0 { + ed.peerMonitorDone = make(chan struct{}) + go ed.monitorPeer(ed.peerMonitorDone) } } @@ -212,9 +215,9 @@ func (ed *Dispatcher) HandleDisconnectedEvent(e esdispatcher.Event) { logger.Warnf("Disconnected from event server: %s", evt.Err) } - if ed.monitorBlockHeightDone != nil { - close(ed.monitorBlockHeightDone) - ed.monitorBlockHeightDone = nil + if ed.peerMonitorDone != nil { + close(ed.peerMonitorDone) + ed.peerMonitorDone = nil } } @@ -238,105 +241,18 @@ func (ed *Dispatcher) clearConnectionRegistration() { } } -func (ed *Dispatcher) filterByBlockHeght(peers []fab.Peer) []fab.Peer { - var minBlockHeight uint64 - if ed.minBlockHeight > 0 { - if ed.LastBlockNum() != math.MaxUint64 { - // No blocks received yet - logger.Debugf("Min block height was specified: %d", ed.minBlockHeight) - minBlockHeight = ed.minBlockHeight - } else { - // Make sure minBlockHeight is greater than the last block received - if ed.minBlockHeight > ed.LastBlockNum() { - minBlockHeight = ed.minBlockHeight - } else { - minBlockHeight = ed.LastBlockNum() + 1 - logger.Debugf("Min block height was specified as %d but the last block received was %d. Setting min height to %d", ed.minBlockHeight, ed.LastBlockNum(), minBlockHeight) - } - } - } - - retPeers := ed.doFilterByBlockHeght(minBlockHeight, peers) - if len(retPeers) == 0 && minBlockHeight > 0 { - // The last block that was received may have been the last block in the channel. Try again with lastBlock-1. - logger.Infof("No peers at the minimum height %d. Trying again with min height %d ...", minBlockHeight, minBlockHeight-1) - minBlockHeight-- - retPeers = ed.doFilterByBlockHeght(minBlockHeight, peers) - if len(retPeers) == 0 { - // No peers at the given height. Try again without min height - logger.Infof("No peers at the minimum height %d. Trying again without min height ...", minBlockHeight) - retPeers = ed.doFilterByBlockHeght(0, peers) - } - } - - return retPeers -} - -func (ed *Dispatcher) doFilterByBlockHeght(minBlockHeight uint64, peers []fab.Peer) []fab.Peer { - var cutoffHeight uint64 - if minBlockHeight > 0 { - logger.Debugf("Setting cutoff height to be min block height: %d ...", minBlockHeight) - cutoffHeight = minBlockHeight - } else { - if ed.blockHeightLagThreshold < 0 || len(peers) == 1 { - logger.Debugf("Returning all peers") - return peers - } - - maxHeight := getMaxBlockHeight(peers) - logger.Debugf("Max block height of peers: %d", maxHeight) - - if maxHeight <= uint64(ed.blockHeightLagThreshold) { - logger.Debugf("Max block height of peers is %d and lag threshold is %d so returning all peers", maxHeight, ed.blockHeightLagThreshold) - return peers - } - cutoffHeight = maxHeight - uint64(ed.blockHeightLagThreshold) - } - - logger.Debugf("Choosing peers whose block heights are at least the cutoff height %d ...", cutoffHeight) - - var retPeers []fab.Peer - for _, p := range peers { - peerState, ok := p.(fab.PeerState) - if !ok { - logger.Debugf("Accepting peer [%s] since it does not have state (may be a local peer)", p.URL()) - retPeers = append(retPeers, p) - } else if peerState.BlockHeight() >= cutoffHeight { - logger.Debugf("Accepting peer [%s] at block height %d which is greater than or equal to the cutoff %d", p.URL(), peerState.BlockHeight(), cutoffHeight) - retPeers = append(retPeers, p) - } else { - logger.Debugf("Rejecting peer [%s] at block height %d which is less than the cutoff %d", p.URL(), peerState.BlockHeight(), cutoffHeight) - } - } - return retPeers -} - -func getMaxBlockHeight(peers []fab.Peer) uint64 { - var maxHeight uint64 - for _, peer := range peers { - peerState, ok := peer.(fab.PeerState) - if ok { - blockHeight := peerState.BlockHeight() - if blockHeight > maxHeight { - maxHeight = blockHeight - } - } - } - return maxHeight -} - -func (ed *Dispatcher) monitorBlockHeight(done chan struct{}) { - logger.Debugf("Starting block height monitor on channel [%s]. Lag threshold: %d", ed.chConfig.ID(), ed.reconnectBlockHeightLagThreshold) +func (ed *Dispatcher) monitorPeer(done chan struct{}) { + logger.Infof("Starting peer monitor on channel [%s]", ed.chConfig.ID()) - ticker := time.NewTicker(ed.blockHeightMonitorPeriod) + ticker := time.NewTicker(ed.peerMonitorPeriod) defer ticker.Stop() for { select { case <-ticker.C: - if !ed.checkBlockHeight() { + if ed.disconnected() { // Disconnected - logger.Debugf("Client on channel [%s] has disconnected - stopping block height monitor", ed.chConfig.ID()) + logger.Debugf("Client on channel [%s] has disconnected - stopping disconnect monitor", ed.chConfig.ID()) return } case <-done: @@ -346,65 +262,37 @@ func (ed *Dispatcher) monitorBlockHeight(done chan struct{}) { } } -// checkBlockHeight checks the current peer's block height relative to the block heights of the -// other peers in the channel and disconnects the peer if the configured threshold is reached. -// Returns true if the block height is acceptable; false if the client has been disconnected from the peer -func (ed *Dispatcher) checkBlockHeight() bool { - logger.Debugf("Checking block heights on channel [%s]...", ed.chConfig.ID()) - - connectedPeer := ed.connectedPeer() +// disconnected checks if the currently connected peer should be disconnected +// Returns true if the client has been disconnected; false otherwise +func (ed *Dispatcher) disconnected() bool { + connectedPeer := ed.ConnectedPeer() if connectedPeer == nil { logger.Debugf("Not connected yet") - return true + return false } - peerState, ok := connectedPeer.(fab.PeerState) - if !ok { - logger.Debugf("Peer does not contain state") - return true - } - - lastBlockReceived := ed.LastBlockNum() - connectedPeerBlockHeight := peerState.BlockHeight() + logger.Debugf("Checking if event client should disconnect from peer [%s] on channel [%s]...", connectedPeer.URL(), ed.chConfig.ID()) peers, err := ed.discoveryService.GetPeers() if err != nil { - logger.Warnf("Error checking block height on peers: %s", err) - return true - } - - maxHeight := getMaxBlockHeight(peers) - - logger.Debugf("Block height on channel [%s] of connected peer [%s] from Discovery: %d, Last block received: %d, Max block height from Discovery: %d", ed.chConfig.ID(), connectedPeer.URL(), connectedPeerBlockHeight, lastBlockReceived, maxHeight) - - if maxHeight <= uint64(ed.reconnectBlockHeightLagThreshold) { - logger.Debugf("Max block height on channel [%s] of peers is %d and reconnect lag threshold is %d so event client will not be disconnected from peer", ed.chConfig.ID(), maxHeight, ed.reconnectBlockHeightLagThreshold) - return true + logger.Warnf("Error calling peer resolver: %s", err) + return false } - // The last block received may be lagging the actual block height of the peer - if lastBlockReceived+1 < connectedPeerBlockHeight { - // We can still get more blocks from the connected peer. Don't disconnect - logger.Debugf("Block height on channel [%s] of connected peer [%s] from Discovery is %d which is greater than last block received+1: %d. Won't disconnect from this peer since more blocks can still be retrieved from the peer", ed.chConfig.ID(), connectedPeer.URL(), connectedPeerBlockHeight, lastBlockReceived+1) - return true + if !ed.peerResolver.ShouldDisconnect(peers, connectedPeer) { + logger.Debugf("Event client will not disconnect from peer [%s] on channel [%s]...", connectedPeer.URL(), ed.chConfig.ID()) + return false } - cutoffHeight := maxHeight - uint64(ed.reconnectBlockHeightLagThreshold) - peerBlockHeight := lastBlockReceived + 1 - - if peerBlockHeight >= cutoffHeight { - logger.Debugf("Block height on channel [%s] from connected peer [%s] is %d which is greater than or equal to the cutoff %d so event client will not be disconnected from peer", ed.chConfig.ID(), connectedPeer.URL(), peerBlockHeight, cutoffHeight) - return true - } + logger.Warnf("The peer resolver determined that the event client should be disconnected from connected peer [%s] on channel [%s]. Disconnecting ...", connectedPeer.URL(), ed.chConfig.ID()) - logger.Infof("Block height on channel [%s] from connected peer is %d which is less than the cutoff %d. Disconnecting from the peer...", ed.chConfig.ID(), peerBlockHeight, cutoffHeight) if err := ed.disconnect(); err != nil { - logger.Warnf("Error disconnecting event client from channel [%s]: %s", ed.chConfig.ID(), err) - return true + logger.Warnf("Error disconnecting event client from peer [%s] on channel [%s]: %s", connectedPeer.URL(), ed.chConfig.ID(), err) + return false } - logger.Info("Successfully disconnected event client from channel [%s]", ed.chConfig.ID()) - return false + logger.Warnf("Successfully disconnected event client from peer [%s] on channel [%s]", connectedPeer.URL(), ed.chConfig.ID()) + return true } func (ed *Dispatcher) disconnect() error { @@ -431,7 +319,8 @@ func (ed *Dispatcher) setConnectedPeer(peer fab.Peer) { ed.peer = peer } -func (ed *Dispatcher) connectedPeer() fab.Peer { +// ConnectedPeer returns the connected peer +func (ed *Dispatcher) ConnectedPeer() fab.Peer { ed.lock.RLock() defer ed.lock.RUnlock() return ed.peer diff --git a/pkg/fab/events/client/dispatcher/dispatcher_test.go b/pkg/fab/events/client/dispatcher/dispatcher_test.go index 97a94f2c0d..de11cb0834 100755 --- a/pkg/fab/events/client/dispatcher/dispatcher_test.go +++ b/pkg/fab/events/client/dispatcher/dispatcher_test.go @@ -12,8 +12,8 @@ import ( "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/lbp" - clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/mocks" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/peerresolver/minblockheight" esdispatcher "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher" servicemocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks" fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" @@ -47,7 +47,7 @@ func TestConnect(t *testing.T) { ), ), WithLoadBalancePolicy(lbp.NewRandom()), - WithBlockHeightLagThreshold(5), + minblockheight.WithBlockHeightLagThreshold(5), ) if dispatcher.ChannelConfig().ID() != channelID { @@ -220,26 +220,6 @@ func TestConnectionEvent(t *testing.T) { } } -func TestFilterByBlockHeight(t *testing.T) { - dispatcher := &Dispatcher{} - - dispatcher.blockHeightLagThreshold = -1 - filteredPeers := dispatcher.filterByBlockHeght([]fab.Peer{peer1, peer2, peer3}) - assert.Equal(t, 3, len(filteredPeers)) - - dispatcher.blockHeightLagThreshold = 0 - filteredPeers = dispatcher.filterByBlockHeght([]fab.Peer{peer1, peer2, peer3}) - assert.Equal(t, 1, len(filteredPeers)) - - dispatcher.blockHeightLagThreshold = 5 - filteredPeers = dispatcher.filterByBlockHeght([]fab.Peer{peer1, peer2, peer3}) - assert.Equal(t, 2, len(filteredPeers)) - - dispatcher.blockHeightLagThreshold = 20 - filteredPeers = dispatcher.filterByBlockHeght([]fab.Peer{peer1, peer2, peer3}) - assert.Equal(t, 3, len(filteredPeers)) -} - func TestDisconnectIfBlockHeightLags(t *testing.T) { p1 := clientmocks.NewMockPeer("peer1", "grpcs://peer1.example.com:7051", 4) p2 := clientmocks.NewMockPeer("peer2", "grpcs://peer2.example.com:7051", 1) @@ -260,9 +240,10 @@ func TestDisconnectIfBlockHeightLags(t *testing.T) { ), ), ), - WithBlockHeightLagThreshold(2), - WithReconnectBlockHeightThreshold(3), - WithBlockHeightMonitorPeriod(250*time.Millisecond), + WithPeerResolver(minblockheight.NewResolver()), + WithPeerMonitorPeriod(250*time.Millisecond), + minblockheight.WithBlockHeightLagThreshold(2), + minblockheight.WithReconnectBlockHeightThreshold(3), ) if err := dispatcher.Start(); err != nil { diff --git a/pkg/fab/events/client/dispatcher/opts.go b/pkg/fab/events/client/dispatcher/opts.go index 7c33aaa0f9..1c62942e0e 100755 --- a/pkg/fab/events/client/dispatcher/opts.go +++ b/pkg/fab/events/client/dispatcher/opts.go @@ -10,24 +10,23 @@ import ( "time" "github.com/hyperledger/fabric-sdk-go/pkg/common/options" - "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/lbp" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/peerresolver" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/peerresolver/minblockheight" ) type params struct { - loadBalancePolicy lbp.LoadBalancePolicy - minBlockHeight uint64 - blockHeightMonitorPeriod time.Duration - blockHeightLagThreshold int - reconnectBlockHeightLagThreshold int + peerMonitorPeriod time.Duration + peerResolverProvider peerresolver.Provider } -func defaultParams(config fab.EventServiceConfig) *params { +func defaultParams(context context.Client) *params { + config := context.EndpointConfig().EventServiceConfig() + return ¶ms{ - loadBalancePolicy: lbp.NewRoundRobin(), - blockHeightMonitorPeriod: config.BlockHeightMonitorPeriod(), - blockHeightLagThreshold: config.BlockHeightLagThreshold(), - reconnectBlockHeightLagThreshold: config.ReconnectBlockHeightLagThreshold(), + peerMonitorPeriod: config.PeerMonitorPeriod(), + peerResolverProvider: minblockheight.NewResolver(), } } @@ -41,37 +40,21 @@ func WithLoadBalancePolicy(value lbp.LoadBalancePolicy) options.Opt { } } -// WithBlockHeightLagThreshold sets the block height lag threshold. If a peer is lagging behind -// the most up-to-date peer by more than the given number of blocks then it will be excluded. -// If set to 0 then only the most up-to-date peers are considered. -// If set to -1 then all peers (regardless of block height) are considered for selection. -func WithBlockHeightLagThreshold(value int) options.Opt { - return func(p options.Params) { - if setter, ok := p.(blockHeightLagThresholdSetter); ok { - setter.SetBlockHeightLagThreshold(value) - } - } -} - -// WithReconnectBlockHeightThreshold indicates that the event client is to disconnect from the peer if the peer's -// block height falls too far behind the other peers. If the connected peer lags more than the given number of blocks -// then the client will disconnect from that peer and reconnect to another peer at a more acceptable block height. -// If set to 0 then this feature is disabled. -// NOTE: Setting this value too low may cause the event client to disconnect/reconnect too frequently, thereby affecting -// performance. -func WithReconnectBlockHeightThreshold(value int) options.Opt { +// WithPeerMonitorPeriod is the period with which the connected peer is monitored +// to see whether or not it should be disconnected. +func WithPeerMonitorPeriod(value time.Duration) options.Opt { return func(p options.Params) { - if setter, ok := p.(reconnectBlockHeightLagThresholdSetter); ok { - setter.SetReconnectBlockHeightLagThreshold(value) + if setter, ok := p.(peerMonitorPeriodSetter); ok { + setter.SetPeerMonitorPeriod(value) } } } -// WithBlockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. -func WithBlockHeightMonitorPeriod(value time.Duration) options.Opt { +// WithPeerResolver sets the peer resolver that chooses the peer from a discovered list of peers. +func WithPeerResolver(value peerresolver.Provider) options.Opt { return func(p options.Params) { - if setter, ok := p.(blockHeightMonitorPeriodSetter); ok { - setter.SetBlockHeightMonitorPeriod(value) + if setter, ok := p.(peerResolverSetter); ok { + setter.SetPeerResolver(value) } } } @@ -80,45 +63,20 @@ type loadBalancePolicySetter interface { SetLoadBalancePolicy(value lbp.LoadBalancePolicy) } -func (p *params) SetLoadBalancePolicy(value lbp.LoadBalancePolicy) { - logger.Debugf("LoadBalancePolicy: %#v", value) - p.loadBalancePolicy = value -} - -type blockHeightLagThresholdSetter interface { - SetBlockHeightLagThreshold(value int) -} - -func (p *params) SetBlockHeightLagThreshold(value int) { - logger.Debugf("BlockHeightLagThreshold: %d", value) - p.blockHeightLagThreshold = value -} - -type reconnectBlockHeightLagThresholdSetter interface { - SetReconnectBlockHeightLagThreshold(value int) -} - -func (p *params) SetReconnectBlockHeightLagThreshold(value int) { - logger.Debugf("ReconnectBlockHeightLagThreshold: %d", value) - p.reconnectBlockHeightLagThreshold = value -} - -type blockHeightMonitorPeriodSetter interface { - SetBlockHeightMonitorPeriod(value time.Duration) +type peerMonitorPeriodSetter interface { + SetPeerMonitorPeriod(value time.Duration) } -func (p *params) SetBlockHeightMonitorPeriod(value time.Duration) { - logger.Debugf("BlockHeightMonitorPeriod: %s", value) - p.blockHeightMonitorPeriod = value +func (p *params) SetPeerMonitorPeriod(value time.Duration) { + logger.Debugf("PeerMonitorPeriod: %s", value) + p.peerMonitorPeriod = value } -func (p *params) SetFromBlock(value uint64) { - logger.Debugf("FromBlock: %d", value) - p.minBlockHeight = value + 1 +type peerResolverSetter interface { + SetPeerResolver(value peerresolver.Provider) } -func (p *params) SetSnapshot(value fab.EventSnapshot) error { - logger.Debugf("SetSnapshot.FromBlock: %d", value) - p.minBlockHeight = value.LastBlockReceived() + 1 - return nil +func (p *params) SetPeerResolver(value peerresolver.Provider) { + logger.Debugf("PeerResolver: %#v", value) + p.peerResolverProvider = value } diff --git a/pkg/fab/events/client/mocks/mockdispatcher.go b/pkg/fab/events/client/mocks/mockdispatcher.go new file mode 100644 index 0000000000..91721b171c --- /dev/null +++ b/pkg/fab/events/client/mocks/mockdispatcher.go @@ -0,0 +1,28 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package mocks + +// MockDispatcher is a mock Dispatcher +type MockDispatcher struct { + Error error + LastBlock uint64 +} + +// Start returns the configured error +func (d *MockDispatcher) Start() error { + return d.Error +} + +// EventCh simply returns the configured error +func (d *MockDispatcher) EventCh() (chan<- interface{}, error) { + return nil, d.Error +} + +// LastBlockNum returns the last block number +func (d *MockDispatcher) LastBlockNum() uint64 { + return d.LastBlock +} diff --git a/pkg/fab/events/client/peerresolver/balanced/balanced.go b/pkg/fab/events/client/peerresolver/balanced/balanced.go new file mode 100644 index 0000000000..418451329c --- /dev/null +++ b/pkg/fab/events/client/peerresolver/balanced/balanced.go @@ -0,0 +1,52 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package balanced + +import ( + "github.com/hyperledger/fabric-sdk-go/pkg/common/logging" + "github.com/hyperledger/fabric-sdk-go/pkg/common/options" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/peerresolver" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service" +) + +var logger = logging.NewLogger("fabsdk/fab") + +// PeerResolver is a peer resolver that chooses peers using the provided load balancer. +type PeerResolver struct { + *params +} + +// NewResolver returns a new "balanced" peer resolver provider. +func NewResolver() peerresolver.Provider { + return func(ed service.Dispatcher, context context.Client, opts ...options.Opt) peerresolver.Resolver { + return New(ed, context, opts...) + } +} + +// New returns a new "balanced" peer resolver. +func New(dispatcher service.Dispatcher, context context.Client, opts ...options.Opt) *PeerResolver { + params := defaultParams(context) + options.Apply(params, opts) + + logger.Debugf("Creating new balanced peer resolver") + + return &PeerResolver{ + params: params, + } +} + +// Resolve returns a peer usig the configured load balancer. +func (r *PeerResolver) Resolve(peers []fab.Peer) (fab.Peer, error) { + return r.loadBalancePolicy.Choose(peers) +} + +// ShouldDisconnect always returns false (will not disconnect a connected peer) +func (r *PeerResolver) ShouldDisconnect(peers []fab.Peer, connectedPeer fab.Peer) bool { + return false +} diff --git a/pkg/fab/events/client/peerresolver/balanced/balanced_test.go b/pkg/fab/events/client/peerresolver/balanced/balanced_test.go new file mode 100644 index 0000000000..5ec11acd8c --- /dev/null +++ b/pkg/fab/events/client/peerresolver/balanced/balanced_test.go @@ -0,0 +1,52 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package balanced + +import ( + "testing" + + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/lbp" + + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/mocks" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" + "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + org1MSP = "Org1MSP" + p1 = clientmocks.NewMockPeer("peer1", "peer1.example.com:7051", 100) + p2 = clientmocks.NewMockPeer("peer2", "peer2.example.com:7051", 110) + p3 = clientmocks.NewMockPeer("peer3", "peer3.example.com:7051", 111) + peers = []fab.Peer{p1, p2, p3} +) + +func TestResolve(t *testing.T) { + dispatcher := &clientmocks.MockDispatcher{} + ctx := mocks.NewMockContext(mockmsp.NewMockSigningIdentity("test", org1MSP)) + resolver := New(dispatcher, ctx) + resolver.loadBalancePolicy = lbp.NewRoundRobin() + + chosenPeers := make(map[string]struct{}) + for i := 0; i < len(peers); i++ { + peer, err := resolver.Resolve(peers) + require.NoError(t, err) + chosenPeers[peer.URL()] = struct{}{} + } + assert.Equalf(t, 3, len(chosenPeers), "expecting all 3 peers to have been chosen") +} + +func TestShouldDisconnect(t *testing.T) { + dispatcher := &clientmocks.MockDispatcher{LastBlock: 100} + ctx := mocks.NewMockContext(mockmsp.NewMockSigningIdentity("test", org1MSP)) + + resolver := New(dispatcher, ctx) + disconnect := resolver.ShouldDisconnect(peers, p1) + assert.Falsef(t, disconnect, "expecting peer NOT to be disconnected") +} diff --git a/pkg/fab/events/client/peerresolver/balanced/opts.go b/pkg/fab/events/client/peerresolver/balanced/opts.go new file mode 100644 index 0000000000..3d9e880ae0 --- /dev/null +++ b/pkg/fab/events/client/peerresolver/balanced/opts.go @@ -0,0 +1,27 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package balanced + +import ( + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/lbp" +) + +type params struct { + loadBalancePolicy lbp.LoadBalancePolicy +} + +func defaultParams(context context.Client) *params { + return ¶ms{ + loadBalancePolicy: lbp.NewRandom(), + } +} + +func (p *params) SetLoadBalancePolicy(value lbp.LoadBalancePolicy) { + logger.Debugf("LoadBalancePolicy: %#v", value) + p.loadBalancePolicy = value +} diff --git a/pkg/fab/events/client/peerresolver/minblockheight/minblockheight.go b/pkg/fab/events/client/peerresolver/minblockheight/minblockheight.go new file mode 100644 index 0000000000..47558c3a32 --- /dev/null +++ b/pkg/fab/events/client/peerresolver/minblockheight/minblockheight.go @@ -0,0 +1,187 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package minblockheight + +import ( + "math" + + "github.com/hyperledger/fabric-sdk-go/pkg/common/logging" + "github.com/hyperledger/fabric-sdk-go/pkg/common/options" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/peerresolver" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service" +) + +var logger = logging.NewLogger("fabsdk/fab") + +// PeerResolver is a peer resolver that chooses the best peer according to a block height lag threshold. +// The maximum block height of all peers is determined and the peers whose block heights are under +// the maximum height but above a privided "lag" threshold are load balanced. The other peers are +// not considered. +type PeerResolver struct { + *params + dispatcher service.Dispatcher +} + +// NewResolver returns a new "min block height" peer resolver provider. +func NewResolver() peerresolver.Provider { + return func(ed service.Dispatcher, context context.Client, opts ...options.Opt) peerresolver.Resolver { + return New(ed, context, opts...) + } +} + +// New returns a new "min block height" peer resolver. +func New(dispatcher service.Dispatcher, context context.Client, opts ...options.Opt) *PeerResolver { + params := defaultParams(context) + options.Apply(params, opts) + + logger.Debugf("Creating new min block height peer resolver with options: blockHeightLagThreshold: %d, reconnectBlockHeightLagThreshold: %d", params.blockHeightLagThreshold, params.reconnectBlockHeightLagThreshold) + + return &PeerResolver{ + params: params, + dispatcher: dispatcher, + } +} + +// Resolve returns the best peer according to a block height lag threshold. The maximum block height of +// all peers is determined and the peers that are within a privided "lag" threshold are load balanced. +func (r *PeerResolver) Resolve(peers []fab.Peer) (fab.Peer, error) { + return r.loadBalancePolicy.Choose(r.Filter(peers)) +} + +// Filter returns the peers that are within a privided "lag" threshold from the highest block height of all peers. +func (r *PeerResolver) Filter(peers []fab.Peer) []fab.Peer { + var minBlockHeight uint64 + if r.minBlockHeight > 0 { + lastBlockReceived := r.dispatcher.LastBlockNum() + if lastBlockReceived == math.MaxUint64 { + // No blocks received yet + logger.Debugf("Min block height was specified: %d", r.minBlockHeight) + minBlockHeight = r.minBlockHeight + } else { + // Make sure minBlockHeight is greater than the last block received + if r.minBlockHeight > lastBlockReceived { + minBlockHeight = r.minBlockHeight + } else { + minBlockHeight = lastBlockReceived + 1 + logger.Debugf("Min block height was specified as %d but the last block received was %d. Setting min height to %d", r.minBlockHeight, lastBlockReceived, minBlockHeight) + } + } + } + + retPeers := r.doFilterByBlockHeight(minBlockHeight, peers) + if len(retPeers) == 0 && minBlockHeight > 0 { + // The last block that was received may have been the last block in the channel. Try again with lastBlock-1. + logger.Debugf("No peers at the minimum height %d. Trying again with min height %d ...", minBlockHeight, minBlockHeight-1) + minBlockHeight-- + retPeers = r.doFilterByBlockHeight(minBlockHeight, peers) + if len(retPeers) == 0 { + // No peers at the given height. Try again without min height + logger.Debugf("No peers at the minimum height %d. Trying again without min height ...", minBlockHeight) + retPeers = r.doFilterByBlockHeight(0, peers) + } + } + + return retPeers +} + +// ShouldDisconnect checks the current peer's block height relative to the block heights of the +// other peers and disconnects the peer if the configured threshold is reached. +// Returns false if the block height is acceptable; true if the client should be disconnected from the peer +func (r *PeerResolver) ShouldDisconnect(peers []fab.Peer, connectedPeer fab.Peer) bool { + // Check if the peer should be disconnected + peerState, ok := connectedPeer.(fab.PeerState) + if !ok { + logger.Debugf("Peer does not contain state") + return false + } + + lastBlockReceived := r.dispatcher.LastBlockNum() + connectedPeerBlockHeight := peerState.BlockHeight() + + maxHeight := getMaxBlockHeight(peers) + + logger.Debugf("Block height of connected peer [%s] from Discovery: %d, Last block received: %d, Max block height from Discovery: %d", connectedPeer.URL(), connectedPeerBlockHeight, lastBlockReceived, maxHeight) + + if maxHeight <= uint64(r.reconnectBlockHeightLagThreshold) { + logger.Debugf("Max block height of peers is %d and reconnect lag threshold is %d so event client will not be disconnected from peer", maxHeight, r.reconnectBlockHeightLagThreshold) + return false + } + + // The last block received may be lagging the actual block height of the peer + if lastBlockReceived+1 < connectedPeerBlockHeight { + // We can still get more blocks from the connected peer. Don't disconnect + logger.Debugf("Block height of connected peer [%s] from Discovery is %d which is greater than last block received+1: %d. Won't disconnect from this peer since more blocks can still be retrieved from the peer", connectedPeer.URL(), connectedPeerBlockHeight, lastBlockReceived+1) + return false + } + + cutoffHeight := maxHeight - uint64(r.reconnectBlockHeightLagThreshold) + peerBlockHeight := lastBlockReceived + 1 + + if peerBlockHeight >= cutoffHeight { + logger.Debugf("Block height from connected peer [%s] is %d which is greater than or equal to the cutoff %d so event client will not be disconnected from peer", connectedPeer.URL(), peerBlockHeight, cutoffHeight) + return false + } + + logger.Debugf("Block height from connected peer is %d which is less than the cutoff %d. Peer should be disconnected.", peerBlockHeight, cutoffHeight) + + return true +} + +func (r *PeerResolver) doFilterByBlockHeight(minBlockHeight uint64, peers []fab.Peer) []fab.Peer { + var cutoffHeight uint64 + if minBlockHeight > 0 { + logger.Debugf("Setting cutoff height to be min block height: %d ...", minBlockHeight) + cutoffHeight = minBlockHeight + } else { + if r.blockHeightLagThreshold < 0 || len(peers) == 1 { + logger.Debugf("Returning all peers") + return peers + } + + maxHeight := getMaxBlockHeight(peers) + logger.Debugf("Max block height of peers: %d", maxHeight) + + if maxHeight <= uint64(r.blockHeightLagThreshold) { + logger.Debugf("Max block height of peers is %d and lag threshold is %d so returning all peers", maxHeight, r.blockHeightLagThreshold) + return peers + } + cutoffHeight = maxHeight - uint64(r.blockHeightLagThreshold) + } + + logger.Debugf("Choosing peers whose block heights are at least the cutoff height %d ...", cutoffHeight) + + var retPeers []fab.Peer + for _, p := range peers { + peerState, ok := p.(fab.PeerState) + if !ok { + logger.Debugf("Accepting peer [%s] since it does not have state (may be a local peer)", p.URL()) + retPeers = append(retPeers, p) + } else if peerState.BlockHeight() >= cutoffHeight { + logger.Debugf("Accepting peer [%s] at block height %d which is greater than or equal to the cutoff %d", p.URL(), peerState.BlockHeight(), cutoffHeight) + retPeers = append(retPeers, p) + } else { + logger.Debugf("Rejecting peer [%s] at block height %d which is less than the cutoff %d", p.URL(), peerState.BlockHeight(), cutoffHeight) + } + } + return retPeers +} + +func getMaxBlockHeight(peers []fab.Peer) uint64 { + var maxHeight uint64 + for _, peer := range peers { + peerState, ok := peer.(fab.PeerState) + if ok { + blockHeight := peerState.BlockHeight() + if blockHeight > maxHeight { + maxHeight = blockHeight + } + } + } + return maxHeight +} diff --git a/pkg/fab/events/client/peerresolver/minblockheight/minblockheight_test.go b/pkg/fab/events/client/peerresolver/minblockheight/minblockheight_test.go new file mode 100644 index 0000000000..99c8797737 --- /dev/null +++ b/pkg/fab/events/client/peerresolver/minblockheight/minblockheight_test.go @@ -0,0 +1,121 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package minblockheight + +import ( + "math" + "testing" + + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/lbp" + + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/mocks" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" + "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + org1MSP = "Org1MSP" + p1 = clientmocks.NewMockPeer("peer1", "peer1.example.com:7051", 100) + p2 = clientmocks.NewMockPeer("peer2", "peer2.example.com:7051", 110) + p3 = clientmocks.NewMockPeer("peer3", "peer3.example.com:7051", 111) + peers = []fab.Peer{p1, p2, p3} +) + +func TestFilter(t *testing.T) { + dispatcher := &clientmocks.MockDispatcher{} + ctx := mocks.NewMockContext(mockmsp.NewMockSigningIdentity("test", org1MSP)) + + resolver := New(dispatcher, ctx, WithBlockHeightLagThreshold(-1)) + filteredPeers := resolver.Filter(peers) + assert.Equal(t, 3, len(filteredPeers)) + + resolver = New(dispatcher, ctx, WithBlockHeightLagThreshold(0)) + filteredPeers = resolver.Filter(peers) + assert.Equal(t, 1, len(filteredPeers)) + + resolver = New(dispatcher, ctx, WithBlockHeightLagThreshold(5)) + filteredPeers = resolver.Filter(peers) + assert.Equal(t, 2, len(filteredPeers)) + + resolver = New(dispatcher, ctx, WithBlockHeightLagThreshold(20)) + filteredPeers = resolver.Filter(peers) + assert.Equal(t, 3, len(filteredPeers)) + + resolver = New(dispatcher, ctx, WithBlockHeightLagThreshold(20)) + resolver.minBlockHeight = 105 + filteredPeers = resolver.Filter(peers) + assert.Equal(t, 2, len(filteredPeers)) + + dispatcher = &clientmocks.MockDispatcher{LastBlock: math.MaxUint64} + resolver = New(dispatcher, ctx, WithBlockHeightLagThreshold(20)) + resolver.minBlockHeight = 105 + filteredPeers = resolver.Filter(peers) + assert.Equal(t, 2, len(filteredPeers)) + + dispatcher = &clientmocks.MockDispatcher{LastBlock: 109} + resolver = New(dispatcher, ctx, WithBlockHeightLagThreshold(20)) + resolver.minBlockHeight = 105 + filteredPeers = resolver.Filter(peers) + assert.Equalf(t, 2, len(filteredPeers), "expecting 2 peers to be returned since minBlockHeight is 105") + + dispatcher = &clientmocks.MockDispatcher{LastBlock: 111} + resolver = New(dispatcher, ctx, WithBlockHeightLagThreshold(20)) + resolver.minBlockHeight = 112 + filteredPeers = resolver.Filter(peers) + assert.Equalf(t, 1, len(filteredPeers), "expecting 1 peer to be returned since minBlockHeight was just 1 under the last block received") + + resolver = New(dispatcher, ctx, WithBlockHeightLagThreshold(20)) + resolver.minBlockHeight = 113 + filteredPeers = resolver.Filter(peers) + assert.Equalf(t, 3, len(filteredPeers), "Expected all peers to be returned since min block height is unrealistic") +} + +func TestResolve(t *testing.T) { + dispatcher := &clientmocks.MockDispatcher{} + ctx := mocks.NewMockContext(mockmsp.NewMockSigningIdentity("test", org1MSP)) + resolver := New(dispatcher, ctx, WithBlockHeightLagThreshold(0)) + peer, err := resolver.Resolve(peers) + require.NoError(t, err) + assert.Equal(t, p3.URL(), peer.URL()) + + resolver = New(dispatcher, ctx, WithBlockHeightLagThreshold(-1)) + resolver.loadBalancePolicy = lbp.NewRoundRobin() + + chosenPeers := make(map[string]struct{}) + for i := 0; i < len(peers); i++ { + peer, err := resolver.Resolve(peers) + require.NoError(t, err) + chosenPeers[peer.URL()] = struct{}{} + } + assert.Equalf(t, 3, len(chosenPeers), "expecting all 3 peers to have been chosen") +} + +func TestShouldDisconnect(t *testing.T) { + dispatcher := &clientmocks.MockDispatcher{LastBlock: 100} + ctx := mocks.NewMockContext(mockmsp.NewMockSigningIdentity("test", org1MSP)) + + resolver := New(dispatcher, ctx, WithReconnectBlockHeightThreshold(120)) + disconnect := resolver.ShouldDisconnect(peers, p1) + assert.Falsef(t, disconnect, "expecting peer NOT to be disconnected since the reconnectBlockHeightThreshold is greater than the maximum block height") + + resolver = New(dispatcher, ctx, WithReconnectBlockHeightThreshold(5)) + disconnect = resolver.ShouldDisconnect(peers, p1) + assert.Truef(t, disconnect, "expecting peer to be disconnected since its block height is lagging more than 5 blocks behind") + + dispatcher = &clientmocks.MockDispatcher{LastBlock: 98} + resolver = New(dispatcher, ctx, WithReconnectBlockHeightThreshold(5)) + disconnect = resolver.ShouldDisconnect(peers, p1) + assert.Falsef(t, disconnect, "expecting peer NOT to be disconnected since the last block received is less than the block height of the peer") + + dispatcher = &clientmocks.MockDispatcher{LastBlock: 110} + resolver = New(dispatcher, ctx, WithReconnectBlockHeightThreshold(5)) + disconnect = resolver.ShouldDisconnect(peers, p3) + assert.Falsef(t, disconnect, "expecting peer NOT to be disconnected since the peer's block height is under the reconnectBlockHeightThreshold") +} diff --git a/pkg/fab/events/client/peerresolver/minblockheight/opts.go b/pkg/fab/events/client/peerresolver/minblockheight/opts.go new file mode 100644 index 0000000000..34a107fa9c --- /dev/null +++ b/pkg/fab/events/client/peerresolver/minblockheight/opts.go @@ -0,0 +1,91 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package minblockheight + +import ( + "github.com/hyperledger/fabric-sdk-go/pkg/common/options" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/lbp" +) + +type params struct { + blockHeightLagThreshold int + reconnectBlockHeightLagThreshold int + minBlockHeight uint64 + loadBalancePolicy lbp.LoadBalancePolicy +} + +func defaultParams(context context.Client) *params { + config := context.EndpointConfig().EventServiceConfig() + + return ¶ms{ + blockHeightLagThreshold: config.BlockHeightLagThreshold(), + reconnectBlockHeightLagThreshold: config.ReconnectBlockHeightLagThreshold(), + loadBalancePolicy: lbp.NewRandom(), + } +} + +// WithBlockHeightLagThreshold sets the block height lag threshold. If a peer is lagging behind +// the most up-to-date peer by more than the given number of blocks then it will be excluded. +// If set to 0 then only the most up-to-date peers are considered. +// If set to -1 then all peers (regardless of block height) are considered for selection. +func WithBlockHeightLagThreshold(value int) options.Opt { + return func(p options.Params) { + if setter, ok := p.(blockHeightLagThresholdSetter); ok { + setter.SetBlockHeightLagThreshold(value) + } + } +} + +// WithReconnectBlockHeightThreshold indicates that the event client is to disconnect from the peer if the peer's +// block height falls too far behind the other peers. If the connected peer lags more than the given number of blocks +// then the client will disconnect from that peer and reconnect to another peer at a more acceptable block height. +// If set to 0 then this feature is disabled. +// NOTE: Setting this value too low may cause the event client to disconnect/reconnect too frequently, thereby affecting +// performance. +func WithReconnectBlockHeightThreshold(value int) options.Opt { + return func(p options.Params) { + if setter, ok := p.(reconnectBlockHeightLagThresholdSetter); ok { + setter.SetReconnectBlockHeightLagThreshold(value) + } + } +} + +func (p *params) SetLoadBalancePolicy(value lbp.LoadBalancePolicy) { + logger.Debugf("LoadBalancePolicy: %#v", value) + p.loadBalancePolicy = value +} + +type blockHeightLagThresholdSetter interface { + SetBlockHeightLagThreshold(value int) +} + +func (p *params) SetBlockHeightLagThreshold(value int) { + logger.Debugf("BlockHeightLagThreshold: %d", value) + p.blockHeightLagThreshold = value +} + +type reconnectBlockHeightLagThresholdSetter interface { + SetReconnectBlockHeightLagThreshold(value int) +} + +func (p *params) SetReconnectBlockHeightLagThreshold(value int) { + logger.Debugf("ReconnectBlockHeightLagThreshold: %d", value) + p.reconnectBlockHeightLagThreshold = value +} + +func (p *params) SetFromBlock(value uint64) { + logger.Debugf("FromBlock: %d", value) + p.minBlockHeight = value + 1 +} + +func (p *params) SetSnapshot(value fab.EventSnapshot) error { + logger.Debugf("SetSnapshot.FromBlock: %d", value) + p.minBlockHeight = value.LastBlockReceived() + 1 + return nil +} diff --git a/pkg/fab/events/client/peerresolver/peerresolver.go b/pkg/fab/events/client/peerresolver/peerresolver.go new file mode 100644 index 0000000000..6b5242e724 --- /dev/null +++ b/pkg/fab/events/client/peerresolver/peerresolver.go @@ -0,0 +1,25 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package peerresolver + +import ( + "github.com/hyperledger/fabric-sdk-go/pkg/common/options" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service" +) + +// Resolver decided which peer to connect to and when to disconnect from that peer +type Resolver interface { + // Resolve chooses a peer from the given set of peers + Resolve(peers []fab.Peer) (fab.Peer, error) + // ShouldDisconnect returns true to disconnect from the connected peer + ShouldDisconnect(peers []fab.Peer, connectedPeer fab.Peer) bool +} + +// Provider creates a peer Resolver +type Provider func(ed service.Dispatcher, context context.Client, opts ...options.Opt) Resolver diff --git a/pkg/fab/mocks/mockconfig.go b/pkg/fab/mocks/mockconfig.go index cc757dbeb1..024c8a1327 100644 --- a/pkg/fab/mocks/mockconfig.go +++ b/pkg/fab/mocks/mockconfig.go @@ -331,7 +331,7 @@ func (c *MockConfig) Lookup(key string) (interface{}, bool) { type MockEventServiceConfig struct { LagThreshold int ReconnectLagThreshold int - HeightMonitorPeriod time.Duration + MonitorPeriod time.Duration } // BlockHeightLagThreshold returns the block height lag threshold. @@ -344,8 +344,8 @@ func (c *MockEventServiceConfig) ReconnectBlockHeightLagThreshold() int { return c.ReconnectLagThreshold } -// BlockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. Note that this -// value is only relevant if reconnectBlockHeightLagThreshold >0. -func (c *MockEventServiceConfig) BlockHeightMonitorPeriod() time.Duration { - return c.HeightMonitorPeriod +// PeerMonitorPeriod is the period in which the connected peer is monitored to see whether +// the event client should disconnect and reconnect to another peer. +func (c *MockEventServiceConfig) PeerMonitorPeriod() time.Duration { + return c.MonitorPeriod } diff --git a/pkg/fab/opts_test.go b/pkg/fab/opts_test.go index c705075d2e..9b81d2a1cb 100644 --- a/pkg/fab/opts_test.go +++ b/pkg/fab/opts_test.go @@ -310,7 +310,7 @@ func (m *mockEventServiceConfigImpl) ReconnectBlockHeightLagThreshold() int { return 10 } -func (m *mockEventServiceConfigImpl) BlockHeightMonitorPeriod() time.Duration { +func (m *mockEventServiceConfigImpl) PeerMonitorPeriod() time.Duration { return time.Second } diff --git a/test/fixtures/config/config_test.yaml b/test/fixtures/config/config_test.yaml index e846fe6964..2fdd85830f 100755 --- a/test/fixtures/config/config_test.yaml +++ b/test/fixtures/config/config_test.yaml @@ -40,7 +40,6 @@ client: # # This interval will define how long a peer is greylisted # greylistExpiry: 10s eventService: - # blockHeightLagThreshold sets the block height lag threshold. This value is used for choosing a peer # to connect to. If a peer is lagging behind the most up-to-date peer by more than the given number of # blocks then it will be excluded from selection. @@ -52,17 +51,17 @@ client: # reconnectBlockHeightLagThreshold - if >0 then the event client will disconnect from the peer if the peer's # block height falls behind the specified number of blocks and will reconnect to a better performing peer. # If set to 0 then this feature is disabled. - # Default: 0 (disabled) + # Default: 0 # NOTES: - # - This feature should only be enabled when using deliver events, otherwise events may be lost during reconnect + # - peerMonitorPeriod must be >0 to enable this feature # - Setting this value too low may cause the event client to disconnect/reconnect too frequently, thereby # affecting performance. reconnectBlockHeightLagThreshold: 2 - # blockHeightMonitorPeriod is the period in which the connected peer's block height is monitored. Note that this - # value is only relevant if reconnectBlockHeightLagThreshold >0. - # Default: 5s - blockHeightMonitorPeriod: 3s + # peerMonitorPeriod is the period in which the connected peer is monitored to see if + # the event client should disconnect from it and reconnect to another peer. + # Default: 0 (disabled) + peerMonitorPeriod: 3s # the below timeouts are commented out to use the default values that are found in # "pkg/fab/endpointconfig.go" diff --git a/test/integration/e2e/configless/endpointconfig_override_test.go b/test/integration/e2e/configless/endpointconfig_override_test.go index ba5b2657e0..307fff3508 100644 --- a/test/integration/e2e/configless/endpointconfig_override_test.go +++ b/test/integration/e2e/configless/endpointconfig_override_test.go @@ -720,7 +720,7 @@ func (c *eventServiceConfig) ReconnectBlockHeightLagThreshold() int { return 10 } -func (c *eventServiceConfig) BlockHeightMonitorPeriod() time.Duration { +func (c *eventServiceConfig) PeerMonitorPeriod() time.Duration { return 5 * time.Second }