diff --git a/pkg/client/common/discovery/dynamicdiscovery/chservice.go b/pkg/client/common/discovery/dynamicdiscovery/chservice.go index bef1668da0..0aa684a041 100644 --- a/pkg/client/common/discovery/dynamicdiscovery/chservice.go +++ b/pkg/client/common/discovery/dynamicdiscovery/chservice.go @@ -17,6 +17,10 @@ import ( "github.com/pkg/errors" ) +const ( + accessDenied = "access denied" +) + // ChannelService implements a dynamic Discovery Service that queries // Fabric's Discovery service for information about the peers that // are currently joined to the given channel. @@ -101,8 +105,8 @@ func (s *ChannelService) evaluate(ctx contextAPI.Client, responses []fabdiscover for _, response := range responses { endpoints, err := response.ForChannel(s.channelID).Peers() if err != nil { - lastErr = errors.Wrap(err, "error getting peers from discovery response") - logger.Warn(lastErr.Error()) + lastErr = newDiscoveryError(err) + logger.Warnf("error getting peers from discovery response: %s", lastErr) continue } return s.asPeers(ctx, endpoints), nil @@ -137,3 +141,15 @@ type peerEndpoint struct { func (p *peerEndpoint) BlockHeight() uint64 { return p.blockHeight } + +type discoveryError struct { + error +} + +func newDiscoveryError(cause error) *discoveryError { + return &discoveryError{error: cause} +} + +func (e *discoveryError) IsFatal() bool { + return e.Error() == accessDenied +} diff --git a/pkg/client/common/discovery/dynamicdiscovery/chservice_test.go b/pkg/client/common/discovery/dynamicdiscovery/chservice_test.go index 6884be8ad2..a312cb0221 100644 --- a/pkg/client/common/discovery/dynamicdiscovery/chservice_test.go +++ b/pkg/client/common/discovery/dynamicdiscovery/chservice_test.go @@ -1,3 +1,5 @@ +// +build testing + /* Copyright SecureKey Technologies Inc. All Rights Reserved. @@ -7,6 +9,7 @@ SPDX-License-Identifier: Apache-2.0 package dynamicdiscovery import ( + "errors" "testing" "time" @@ -57,14 +60,14 @@ func TestDiscoveryService(t *testing.T) { }, ) - clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) { + SetClientProvider(func(ctx contextAPI.Client) (DiscoveryClient, error) { return discClient, nil - } + }) service, err := NewChannelService( ctx, mocks.NewMockMembership(), ch, - WithRefreshInterval(500*time.Millisecond), - WithResponseTimeout(2*time.Second), + WithRefreshInterval(10*time.Millisecond), + WithResponseTimeout(100*time.Millisecond), ) require.NoError(t, err) defer service.Close() @@ -85,7 +88,7 @@ func TestDiscoveryService(t *testing.T) { }, ) - time.Sleep(1 * time.Second) + time.Sleep(20 * time.Millisecond) peers, err = service.GetPeers() assert.NoError(t, err) @@ -108,16 +111,44 @@ func TestDiscoveryService(t *testing.T) { }, ) - time.Sleep(1 * time.Second) + time.Sleep(20 * time.Millisecond) peers, err = service.GetPeers() - assert.NoError(t, err) + require.NoError(t, err) assert.Equalf(t, 2, len(peers), "Expected 2 peers") filteredService := discovery.NewDiscoveryFilterService(service, &blockHeightFilter{minBlockHeight: 10}) peers, err = filteredService.GetPeers() require.NoError(t, err) require.Equalf(t, 1, len(peers), "expecting discovery filter to return only one peer") + + // Non-fatal error + discClient.SetResponses( + &clientmocks.MockDiscoverEndpointResponse{ + Error: errors.New("some transient error"), + }, + ) + + time.Sleep(50 * time.Millisecond) + + // GetPeers should return the cached response + peers, err = service.GetPeers() + require.NoError(t, err) + assert.Equalf(t, 2, len(peers), "Expected 2 peers") + + // Fatal error (access denied can be due due a user being revoked) + discClient.SetResponses( + &clientmocks.MockDiscoverEndpointResponse{ + Error: errors.New(accessDenied), + }, + ) + + time.Sleep(50 * time.Millisecond) + + // The discovery service should have been closed + _, err = service.GetPeers() + require.Error(t, err) + assert.Equal(t, "Discovery client has been closed due to error: access denied", err.Error()) } func TestDiscoveryServiceWithNewOrgJoined(t *testing.T) { @@ -154,9 +185,9 @@ func TestDiscoveryServiceWithNewOrgJoined(t *testing.T) { }, ) - clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) { + SetClientProvider(func(ctx contextAPI.Client) (DiscoveryClient, error) { return discClient, nil - } + }) service, err := NewChannelService( ctx, diff --git a/pkg/client/common/discovery/dynamicdiscovery/chservice_test_env.go b/pkg/client/common/discovery/dynamicdiscovery/chservice_test_env.go new file mode 100644 index 0000000000..17fcc5a79a --- /dev/null +++ b/pkg/client/common/discovery/dynamicdiscovery/chservice_test_env.go @@ -0,0 +1,18 @@ +// +build testing + +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package dynamicdiscovery + +import ( + contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" +) + +// SetClientProvider overrides the discovery client provider for unit tests +func SetClientProvider(provider func(ctx contextAPI.Client) (DiscoveryClient, error)) { + clientProvider = provider +} diff --git a/pkg/client/common/discovery/dynamicdiscovery/localprovider_test.go b/pkg/client/common/discovery/dynamicdiscovery/localprovider_test.go index f2f09c6327..ad557c67b9 100644 --- a/pkg/client/common/discovery/dynamicdiscovery/localprovider_test.go +++ b/pkg/client/common/discovery/dynamicdiscovery/localprovider_test.go @@ -1,3 +1,5 @@ +// +build testing + /* Copyright SecureKey Technologies Inc. All Rights Reserved. diff --git a/pkg/client/common/discovery/dynamicdiscovery/localservice_test.go b/pkg/client/common/discovery/dynamicdiscovery/localservice_test.go index 6db5b47e2c..771b97c86e 100644 --- a/pkg/client/common/discovery/dynamicdiscovery/localservice_test.go +++ b/pkg/client/common/discovery/dynamicdiscovery/localservice_test.go @@ -1,3 +1,5 @@ +// +build testing + /* Copyright SecureKey Technologies Inc. All Rights Reserved. @@ -45,9 +47,9 @@ func TestLocalDiscoveryService(t *testing.T) { }, ) - clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) { + SetClientProvider(func(ctx contextAPI.Client) (DiscoveryClient, error) { return discClient, nil - } + }) // Test initialize with invalid MSP ID service := newLocalService(config, mspID2) diff --git a/pkg/client/common/discovery/dynamicdiscovery/service.go b/pkg/client/common/discovery/dynamicdiscovery/service.go index a9c5a14f49..eb3c09f025 100644 --- a/pkg/client/common/discovery/dynamicdiscovery/service.go +++ b/pkg/client/common/discovery/dynamicdiscovery/service.go @@ -20,12 +20,13 @@ import ( "github.com/pkg/errors" ) -type discoveryClient interface { +// DiscoveryClient is the client to the discovery service +type DiscoveryClient interface { Send(ctx context.Context, req *discclient.Request, targets ...fab.PeerConfig) ([]fabdiscovery.Response, error) } // clientProvider is overridden by unit tests -var clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) { +var clientProvider = func(ctx contextAPI.Client) (DiscoveryClient, error) { return fabdiscovery.New(ctx) } @@ -36,8 +37,9 @@ type service struct { responseTimeout time.Duration lock sync.RWMutex ctx contextAPI.Client - discClient discoveryClient + discClient DiscoveryClient peersRef *lazyref.Reference + lastErr error } type queryPeers func() ([]fab.Peer, error) @@ -57,15 +59,27 @@ func newService(config fab.EndpointConfig, query queryPeers, opts ...coptions.Op logger.Debugf("Cache refresh interval: %s", options.refreshInterval) logger.Debugf("Deliver service response timeout: %s", options.responseTimeout) - return &service{ + s := &service{ responseTimeout: options.responseTimeout, - peersRef: lazyref.New( - func() (interface{}, error) { - return query() - }, - lazyref.WithRefreshInterval(lazyref.InitOnFirstAccess, options.refreshInterval), - ), } + + s.peersRef = lazyref.New( + func() (interface{}, error) { + peers, err := query() + if err != nil { + derr, ok := err.(*discoveryError) + if ok && derr.IsFatal() { + logger.Warnf("Got fatal error [%s]. Closing discovery client.", err) + s.lastErr = err + go func() { s.Close() }() + } + } + return peers, err + }, + lazyref.WithRefreshInterval(lazyref.InitOnFirstAccess, options.refreshInterval), + ) + + return s } // initialize initializes the service with client context @@ -98,6 +112,13 @@ func (s *service) Close() { // GetPeers returns the available peers func (s *service) GetPeers() ([]fab.Peer, error) { + if s.peersRef.IsClosed() { + if s.lastErr != nil { + return nil, errors.Errorf("Discovery client has been closed due to error: %s", s.lastErr) + } + return nil, errors.Errorf("Discovery client has been closed") + } + refValue, err := s.peersRef.Get() if err != nil { return nil, err @@ -115,7 +136,7 @@ func (s *service) context() contextAPI.Client { return s.ctx } -func (s *service) discoveryClient() discoveryClient { +func (s *service) discoveryClient() DiscoveryClient { s.lock.RLock() defer s.lock.RUnlock() return s.discClient diff --git a/pkg/fabsdk/provider/chpvdr/chprovider_test.go b/pkg/fabsdk/provider/chpvdr/chprovider_test.go index 6d16303df4..1cc56a22fb 100644 --- a/pkg/fabsdk/provider/chpvdr/chprovider_test.go +++ b/pkg/fabsdk/provider/chpvdr/chprovider_test.go @@ -9,13 +9,14 @@ SPDX-License-Identifier: Apache-2.0 package chpvdr import ( + "errors" "testing" "github.com/hyperledger/fabric-sdk-go/pkg/client/common/discovery/dynamicdiscovery" "github.com/hyperledger/fabric-sdk-go/pkg/client/common/discovery/staticdiscovery" + clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/mocks" "github.com/hyperledger/fabric-sdk-go/pkg/client/common/selection/dynamicselection" "github.com/hyperledger/fabric-sdk-go/pkg/client/common/selection/fabricselection" - "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/msp" @@ -113,3 +114,51 @@ func TestBasicValidChannel(t *testing.T) { _, ok = selection.(*fabricselection.Service) assert.Truef(t, ok, "Expecting selection to be Fabric for v1_2") } + +func TestAccessDenied(t *testing.T) { + ctx := mocks.NewMockProviderContext() + + user := mspmocks.NewMockSigningIdentity("user", "user") + + clientCtx := &mockClientContext{ + Providers: ctx, + SigningIdentity: user, + } + + discClient := clientmocks.NewMockDiscoveryClient() + + discClient.SetResponses( + &clientmocks.MockDiscoverEndpointResponse{ + Error: errors.New("access denied"), + }, + ) + + dynamicdiscovery.SetClientProvider(func(ctx context.Client) (dynamicdiscovery.DiscoveryClient, error) { + return discClient, nil + }) + + cp, err := New(clientCtx.EndpointConfig()) + require.NoError(t, err) + + err = cp.Initialize(ctx) + assert.NoError(t, err) + + testChannelCfg := mocks.NewMockChannelCfg("testchannel") + testChannelCfg.MockCapabilities[fab.ApplicationGroupKey][fab.V1_2Capability] = true + mockChConfigCache := newMockChCfgCache(chconfig.NewChannelCfg("")) + mockChConfigCache.Put(testChannelCfg) + cp.chCfgCache = mockChConfigCache + + channelService, err := cp.ChannelService(clientCtx, "testchannel") + require.NoError(t, err) + + discovery, err := channelService.Discovery() + require.NoError(t, err) + require.NotNil(t, discovery) + _, ok := discovery.(*dynamicdiscovery.ChannelService) + assert.Truef(t, ok, "Expecting discovery to be Dynamic for v1_2") + + _, err = discovery.GetPeers() + require.Error(t, err) + assert.Equal(t, "access denied", err.Error()) +} diff --git a/pkg/util/concurrent/lazyref/lazyref.go b/pkg/util/concurrent/lazyref/lazyref.go index 245ca328a2..9918accc0a 100644 --- a/pkg/util/concurrent/lazyref/lazyref.go +++ b/pkg/util/concurrent/lazyref/lazyref.go @@ -79,7 +79,7 @@ type Reference struct { lastTimeAccessed unsafe.Pointer lock sync.RWMutex wg sync.WaitGroup - closed bool + closed uint32 running bool closech chan bool } @@ -135,8 +135,17 @@ func NewWithData(initializer InitializerWithData, opts ...options.Opt) *Referenc return lazyRef } +// IsClosed returns true if the referenced has been closed +func (r *Reference) IsClosed() bool { + return atomic.LoadUint32(&r.closed) == 1 +} + // Get returns the value, or an error if the initialiser returned an error. func (r *Reference) Get(data ...interface{}) (interface{}, error) { + if r.IsClosed() { + return nil, errors.New("reference is already closed") + } + // Try outside of a lock if value, ok := r.get(); ok { return value, nil @@ -145,10 +154,6 @@ func (r *Reference) Get(data ...interface{}) (interface{}, error) { r.lock.Lock() defer r.lock.Unlock() - if r.closed { - return nil, errors.New("reference is already closed") - } - // Try again inside the lock if value, ok := r.get(); ok { return value, nil @@ -192,13 +197,7 @@ func (r *Reference) Close() { } func (r *Reference) setClosed() bool { - r.lock.Lock() - defer r.lock.Unlock() - if r.closed { - return false - } - r.closed = true - return true + return atomic.CompareAndSwapUint32(&r.closed, 0, 1) } func (r *Reference) notifyClosing() { @@ -241,7 +240,7 @@ func (r *Reference) setTimerRunning() bool { r.lock.Lock() defer r.lock.Unlock() - if r.running || r.closed { + if r.running || r.IsClosed() { logger.Debug("Cannot start timer since timer is either already running or it is closed") return false }