Skip to content

Commit

Permalink
[FABG-810] Discovery handling of 'access denied' error
Browse files Browse the repository at this point in the history
When an 'access denied' error is received from the Discovery
server, close the discovery service reference so that periodic
refresh is halted.

Change-Id: I48f08e571170d15d4c91785266dd8946cae8276a
Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Jan 10, 2019
1 parent 3b2b876 commit 5e291d3
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 38 deletions.
20 changes: 18 additions & 2 deletions pkg/client/common/discovery/dynamicdiscovery/chservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
"github.com/pkg/errors"
)

const (
accessDenied = "access denied"
)

// ChannelService implements a dynamic Discovery Service that queries
// Fabric's Discovery service for information about the peers that
// are currently joined to the given channel.
Expand Down Expand Up @@ -101,8 +105,8 @@ func (s *ChannelService) evaluate(ctx contextAPI.Client, responses []fabdiscover
for _, response := range responses {
endpoints, err := response.ForChannel(s.channelID).Peers()
if err != nil {
lastErr = errors.Wrap(err, "error getting peers from discovery response")
logger.Warn(lastErr.Error())
lastErr = newDiscoveryError(err)
logger.Warnf("error getting peers from discovery response: %s", lastErr)
continue
}
return s.asPeers(ctx, endpoints), nil
Expand Down Expand Up @@ -137,3 +141,15 @@ type peerEndpoint struct {
func (p *peerEndpoint) BlockHeight() uint64 {
return p.blockHeight
}

type discoveryError struct {
error
}

func newDiscoveryError(cause error) *discoveryError {
return &discoveryError{error: cause}
}

func (e *discoveryError) IsFatal() bool {
return e.Error() == accessDenied
}
49 changes: 40 additions & 9 deletions pkg/client/common/discovery/dynamicdiscovery/chservice_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build testing

/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
Expand All @@ -7,6 +9,7 @@ SPDX-License-Identifier: Apache-2.0
package dynamicdiscovery

import (
"errors"
"testing"
"time"

Expand Down Expand Up @@ -57,14 +60,14 @@ func TestDiscoveryService(t *testing.T) {
},
)

clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) {
SetClientProvider(func(ctx contextAPI.Client) (DiscoveryClient, error) {
return discClient, nil
}
})

service, err := NewChannelService(
ctx, mocks.NewMockMembership(), ch,
WithRefreshInterval(500*time.Millisecond),
WithResponseTimeout(2*time.Second),
WithRefreshInterval(10*time.Millisecond),
WithResponseTimeout(100*time.Millisecond),
)
require.NoError(t, err)
defer service.Close()
Expand All @@ -85,7 +88,7 @@ func TestDiscoveryService(t *testing.T) {
},
)

time.Sleep(1 * time.Second)
time.Sleep(20 * time.Millisecond)

peers, err = service.GetPeers()
assert.NoError(t, err)
Expand All @@ -108,16 +111,44 @@ func TestDiscoveryService(t *testing.T) {
},
)

time.Sleep(1 * time.Second)
time.Sleep(20 * time.Millisecond)

peers, err = service.GetPeers()
assert.NoError(t, err)
require.NoError(t, err)
assert.Equalf(t, 2, len(peers), "Expected 2 peers")

filteredService := discovery.NewDiscoveryFilterService(service, &blockHeightFilter{minBlockHeight: 10})
peers, err = filteredService.GetPeers()
require.NoError(t, err)
require.Equalf(t, 1, len(peers), "expecting discovery filter to return only one peer")

// Non-fatal error
discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
Error: errors.New("some transient error"),
},
)

time.Sleep(50 * time.Millisecond)

// GetPeers should return the cached response
peers, err = service.GetPeers()
require.NoError(t, err)
assert.Equalf(t, 2, len(peers), "Expected 2 peers")

// Fatal error (access denied can be due due a user being revoked)
discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
Error: errors.New(accessDenied),
},
)

time.Sleep(50 * time.Millisecond)

// The discovery service should have been closed
_, err = service.GetPeers()
require.Error(t, err)
assert.Equal(t, "Discovery client has been closed due to error: access denied", err.Error())
}

func TestDiscoveryServiceWithNewOrgJoined(t *testing.T) {
Expand Down Expand Up @@ -154,9 +185,9 @@ func TestDiscoveryServiceWithNewOrgJoined(t *testing.T) {
},
)

clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) {
SetClientProvider(func(ctx contextAPI.Client) (DiscoveryClient, error) {
return discClient, nil
}
})

service, err := NewChannelService(
ctx,
Expand Down
18 changes: 18 additions & 0 deletions pkg/client/common/discovery/dynamicdiscovery/chservice_test_env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// +build testing

/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package dynamicdiscovery

import (
contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
)

// SetClientProvider overrides the discovery client provider for unit tests
func SetClientProvider(provider func(ctx contextAPI.Client) (DiscoveryClient, error)) {
clientProvider = provider
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build testing

/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build testing

/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
Expand Down Expand Up @@ -45,9 +47,9 @@ func TestLocalDiscoveryService(t *testing.T) {
},
)

clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) {
SetClientProvider(func(ctx contextAPI.Client) (DiscoveryClient, error) {
return discClient, nil
}
})

// Test initialize with invalid MSP ID
service := newLocalService(config, mspID2)
Expand Down
43 changes: 32 additions & 11 deletions pkg/client/common/discovery/dynamicdiscovery/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
"github.com/pkg/errors"
)

type discoveryClient interface {
// DiscoveryClient is the client to the discovery service
type DiscoveryClient interface {
Send(ctx context.Context, req *discclient.Request, targets ...fab.PeerConfig) ([]fabdiscovery.Response, error)
}

// clientProvider is overridden by unit tests
var clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) {
var clientProvider = func(ctx contextAPI.Client) (DiscoveryClient, error) {
return fabdiscovery.New(ctx)
}

Expand All @@ -36,8 +37,9 @@ type service struct {
responseTimeout time.Duration
lock sync.RWMutex
ctx contextAPI.Client
discClient discoveryClient
discClient DiscoveryClient
peersRef *lazyref.Reference
lastErr error
}

type queryPeers func() ([]fab.Peer, error)
Expand All @@ -57,15 +59,27 @@ func newService(config fab.EndpointConfig, query queryPeers, opts ...coptions.Op
logger.Debugf("Cache refresh interval: %s", options.refreshInterval)
logger.Debugf("Deliver service response timeout: %s", options.responseTimeout)

return &service{
s := &service{
responseTimeout: options.responseTimeout,
peersRef: lazyref.New(
func() (interface{}, error) {
return query()
},
lazyref.WithRefreshInterval(lazyref.InitOnFirstAccess, options.refreshInterval),
),
}

s.peersRef = lazyref.New(
func() (interface{}, error) {
peers, err := query()
if err != nil {
derr, ok := err.(*discoveryError)
if ok && derr.IsFatal() {
logger.Warnf("Got fatal error [%s]. Closing discovery client.", err)
s.lastErr = err
go func() { s.Close() }()
}
}
return peers, err
},
lazyref.WithRefreshInterval(lazyref.InitOnFirstAccess, options.refreshInterval),
)

return s
}

// initialize initializes the service with client context
Expand Down Expand Up @@ -98,6 +112,13 @@ func (s *service) Close() {

// GetPeers returns the available peers
func (s *service) GetPeers() ([]fab.Peer, error) {
if s.peersRef.IsClosed() {
if s.lastErr != nil {
return nil, errors.Errorf("Discovery client has been closed due to error: %s", s.lastErr)
}
return nil, errors.Errorf("Discovery client has been closed")
}

refValue, err := s.peersRef.Get()
if err != nil {
return nil, err
Expand All @@ -115,7 +136,7 @@ func (s *service) context() contextAPI.Client {
return s.ctx
}

func (s *service) discoveryClient() discoveryClient {
func (s *service) discoveryClient() DiscoveryClient {
s.lock.RLock()
defer s.lock.RUnlock()
return s.discClient
Expand Down
51 changes: 50 additions & 1 deletion pkg/fabsdk/provider/chpvdr/chprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ SPDX-License-Identifier: Apache-2.0
package chpvdr

import (
"errors"
"testing"

"github.com/hyperledger/fabric-sdk-go/pkg/client/common/discovery/dynamicdiscovery"
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/discovery/staticdiscovery"
clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/mocks"
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/selection/dynamicselection"
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/selection/fabricselection"

"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/msp"
Expand Down Expand Up @@ -113,3 +114,51 @@ func TestBasicValidChannel(t *testing.T) {
_, ok = selection.(*fabricselection.Service)
assert.Truef(t, ok, "Expecting selection to be Fabric for v1_2")
}

func TestAccessDenied(t *testing.T) {
ctx := mocks.NewMockProviderContext()

user := mspmocks.NewMockSigningIdentity("user", "user")

clientCtx := &mockClientContext{
Providers: ctx,
SigningIdentity: user,
}

discClient := clientmocks.NewMockDiscoveryClient()

discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
Error: errors.New("access denied"),
},
)

dynamicdiscovery.SetClientProvider(func(ctx context.Client) (dynamicdiscovery.DiscoveryClient, error) {
return discClient, nil
})

cp, err := New(clientCtx.EndpointConfig())
require.NoError(t, err)

err = cp.Initialize(ctx)
assert.NoError(t, err)

testChannelCfg := mocks.NewMockChannelCfg("testchannel")
testChannelCfg.MockCapabilities[fab.ApplicationGroupKey][fab.V1_2Capability] = true
mockChConfigCache := newMockChCfgCache(chconfig.NewChannelCfg(""))
mockChConfigCache.Put(testChannelCfg)
cp.chCfgCache = mockChConfigCache

channelService, err := cp.ChannelService(clientCtx, "testchannel")
require.NoError(t, err)

discovery, err := channelService.Discovery()
require.NoError(t, err)
require.NotNil(t, discovery)
_, ok := discovery.(*dynamicdiscovery.ChannelService)
assert.Truef(t, ok, "Expecting discovery to be Dynamic for v1_2")

_, err = discovery.GetPeers()
require.Error(t, err)
assert.Equal(t, "access denied", err.Error())
}
25 changes: 12 additions & 13 deletions pkg/util/concurrent/lazyref/lazyref.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type Reference struct {
lastTimeAccessed unsafe.Pointer
lock sync.RWMutex
wg sync.WaitGroup
closed bool
closed uint32
running bool
closech chan bool
}
Expand Down Expand Up @@ -135,8 +135,17 @@ func NewWithData(initializer InitializerWithData, opts ...options.Opt) *Referenc
return lazyRef
}

// IsClosed returns true if the referenced has been closed
func (r *Reference) IsClosed() bool {
return atomic.LoadUint32(&r.closed) == 1
}

// Get returns the value, or an error if the initialiser returned an error.
func (r *Reference) Get(data ...interface{}) (interface{}, error) {
if r.IsClosed() {
return nil, errors.New("reference is already closed")
}

// Try outside of a lock
if value, ok := r.get(); ok {
return value, nil
Expand All @@ -145,10 +154,6 @@ func (r *Reference) Get(data ...interface{}) (interface{}, error) {
r.lock.Lock()
defer r.lock.Unlock()

if r.closed {
return nil, errors.New("reference is already closed")
}

// Try again inside the lock
if value, ok := r.get(); ok {
return value, nil
Expand Down Expand Up @@ -192,13 +197,7 @@ func (r *Reference) Close() {
}

func (r *Reference) setClosed() bool {
r.lock.Lock()
defer r.lock.Unlock()
if r.closed {
return false
}
r.closed = true
return true
return atomic.CompareAndSwapUint32(&r.closed, 0, 1)
}

func (r *Reference) notifyClosing() {
Expand Down Expand Up @@ -241,7 +240,7 @@ func (r *Reference) setTimerRunning() bool {
r.lock.Lock()
defer r.lock.Unlock()

if r.running || r.closed {
if r.running || r.IsClosed() {
logger.Debug("Cannot start timer since timer is either already running or it is closed")
return false
}
Expand Down

0 comments on commit 5e291d3

Please sign in to comment.