From 4b576fda8bf15e856359bd9aa82bd21a6dcf1674 Mon Sep 17 00:00:00 2001 From: Troy Ronda Date: Tue, 27 Feb 2018 16:46:46 -0500 Subject: [PATCH] [FAB-8578] Connection Caching This change introduces the CachingConnector. This component provides the ability to cache GRPC connections. It provides a GRPC compatible Context Dialer interface via the "DialContext" method. Change-Id: Idd2f61e52a3e078cf81042853808c59c9a37b47b Signed-off-by: Troy Ronda --- pkg/context/api/core/provider.go | 4 + pkg/context/api/fab/provider.go | 1 + pkg/core/config/config.go | 19 +- pkg/fab/comm/comm_test.go | 86 +++++ pkg/fab/comm/connection_test.go | 28 +- pkg/fab/comm/connector.go | 332 +++++++++++++++++++ pkg/fab/comm/connector_test.go | 211 ++++++++++++ pkg/fab/peer/peer.go | 41 ++- pkg/fab/peer/peerendorser.go | 8 +- pkg/fab/peer/peerendorser_test.go | 1 + pkg/fabsdk/fabsdk.go | 4 +- pkg/fabsdk/fabsdk_test.go | 3 +- pkg/fabsdk/provider/fabpvdr/fabpvdr.go | 15 +- test/fixtures/config/config_pkcs11_test.yaml | 3 + test/fixtures/config/config_test.yaml | 4 + 15 files changed, 717 insertions(+), 43 deletions(-) create mode 100644 pkg/fab/comm/comm_test.go create mode 100644 pkg/fab/comm/connector.go create mode 100644 pkg/fab/comm/connector_test.go diff --git a/pkg/context/api/core/provider.go b/pkg/context/api/core/provider.go index bb4da4607b..fb2d0709a2 100644 --- a/pkg/context/api/core/provider.go +++ b/pkg/context/api/core/provider.go @@ -76,6 +76,10 @@ const ( OrdererResponse // DiscoveryGreylistExpiry discovery Greylist expiration period DiscoveryGreylistExpiry + // ConnectionIdle is the timeout for closing idle connections + ConnectionIdle + // CacheSweepInterval is the duration between cache sweeps + CacheSweepInterval ) // Providers represents the SDK configured core providers context. diff --git a/pkg/context/api/fab/provider.go b/pkg/context/api/fab/provider.go index 0372574510..790a65da54 100644 --- a/pkg/context/api/fab/provider.go +++ b/pkg/context/api/fab/provider.go @@ -19,6 +19,7 @@ type InfraProvider interface { CreateEventHub(ic IdentityContext, name string) (EventHub, error) CreatePeerFromConfig(peerCfg *core.NetworkPeer) (Peer, error) CreateOrdererFromConfig(cfg *core.OrdererConfig) (Orderer, error) + Close() } // SelectionProvider is used to select peers for endorsement diff --git a/pkg/core/config/config.go b/pkg/core/config/config.go index e768678080..0cc0cd9623 100644 --- a/pkg/core/config/config.go +++ b/pkg/core/config/config.go @@ -34,9 +34,11 @@ import ( var logger = logging.NewLogger(logModule) const ( - cmdRoot = "FABRIC_SDK" - logModule = "fabric_sdk_go" - defaultTimeout = time.Second * 5 + cmdRoot = "FABRIC_SDK" + logModule = "fabric_sdk_go" + defaultTimeout = time.Second * 5 + defaultConnIdleTimeout = time.Second * 30 + defaultCacheSweepInterval = time.Second * 15 ) // Config represents the configuration for the client @@ -458,7 +460,16 @@ func (c *Config) TimeoutOrDefault(conn core.TimeoutType) time.Duration { timeout = c.configViper.GetDuration("client.orderer.timeout.connection") case core.OrdererResponse: timeout = c.configViper.GetDuration("client.orderer.timeout.response") - + case core.CacheSweepInterval: // EXPERIMENTAL - do we need this to be configurable? + timeout = c.configViper.GetDuration("client.cache.interval.sweep") + if timeout == 0 { + timeout = defaultCacheSweepInterval + } + case core.ConnectionIdle: + timeout = c.configViper.GetDuration("client.cache.timeout.connectionIdle") + if timeout == 0 { + timeout = defaultConnIdleTimeout + } } if timeout == 0 { timeout = defaultTimeout diff --git a/pkg/fab/comm/comm_test.go b/pkg/fab/comm/comm_test.go new file mode 100644 index 0000000000..6096c67072 --- /dev/null +++ b/pkg/fab/comm/comm_test.go @@ -0,0 +1,86 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package comm + +import ( + "fmt" + "net" + "os" + "testing" + "time" + + eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/mocks" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" + pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" + "github.com/pkg/errors" + "google.golang.org/grpc" +) + +const ( + peerAddress = "localhost:9999" + endorserAddress = "127.0.0.1:0" + peerURL = "grpc://" + peerAddress +) + +func TestMain(m *testing.M) { + var opts []grpc.ServerOption + grpcServer := grpc.NewServer(opts...) + + lis, err := net.Listen("tcp", peerAddress) + if err != nil { + panic(fmt.Sprintf("Error starting events listener %s", err)) + } + + testServer = eventmocks.NewMockEventhubServer() + + pb.RegisterEventsServer(grpcServer, testServer) + + go grpcServer.Serve(lis) + + srvs, addrs, err := startEndorsers(2, endorserAddress) + if err != nil { + panic(fmt.Sprintf("Error starting endorser %s", err)) + } + for _, srv := range srvs { + defer srv.Stop() + } + endorserAddr = addrs + + time.Sleep(2 * time.Second) + os.Exit(m.Run()) +} + +func startEndorsers(count int, address string) ([]*grpc.Server, []string, error) { + srvs := make([]*grpc.Server, 0, count) + addrs := make([]string, 0, count) + + for i := 0; i < count; i++ { + srv := grpc.NewServer() + _, addr, ok := startEndorserServer(srv, address) + if !ok { + return nil, nil, errors.New("unable to start GRPC server") + } + srvs = append(srvs, srv) + addrs = append(addrs, addr) + } + return srvs, addrs, nil +} + +func startEndorserServer(grpcServer *grpc.Server, address string) (*mocks.MockEndorserServer, string, bool) { + lis, err := net.Listen("tcp", address) + if err != nil { + fmt.Printf("Error starting test server %s", err) + return nil, "", false + } + addr := lis.Addr().String() + + endorserServer := &mocks.MockEndorserServer{} + pb.RegisterEndorserServer(grpcServer, endorserServer) + fmt.Printf("Starting test server on %s", addr) + go grpcServer.Serve(lis) + return endorserServer, addr, true +} diff --git a/pkg/fab/comm/connection_test.go b/pkg/fab/comm/connection_test.go index 9b3e1037d6..4e7a64ba9f 100755 --- a/pkg/fab/comm/connection_test.go +++ b/pkg/fab/comm/connection_test.go @@ -8,9 +8,6 @@ package comm import ( "context" - "fmt" - "net" - "os" "testing" "time" @@ -26,11 +23,6 @@ import ( "google.golang.org/grpc" ) -const ( - peerAddress = "localhost:9999" - peerURL = "grpc://" + peerAddress -) - var testStream = func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) { return pb.NewDeliverClient(grpcconn).Deliver(context.Background()) } @@ -93,25 +85,7 @@ func TestConnection(t *testing.T) { // Use the Deliver server for testing var testServer *eventmocks.MockEventhubServer - -func TestMain(m *testing.M) { - var opts []grpc.ServerOption - grpcServer := grpc.NewServer(opts...) - - lis, err := net.Listen("tcp", peerAddress) - if err != nil { - panic(fmt.Sprintf("Error starting events listener %s", err)) - } - - testServer = eventmocks.NewMockEventhubServer() - - pb.RegisterEventsServer(grpcServer, testServer) - - go grpcServer.Serve(lis) - - time.Sleep(2 * time.Second) - os.Exit(m.Run()) -} +var endorserAddr []string func newPeerConfig(peerURL string) *core.PeerConfig { return &core.PeerConfig{ diff --git a/pkg/fab/comm/connector.go b/pkg/fab/comm/connector.go new file mode 100644 index 0000000000..2f52d6ab09 --- /dev/null +++ b/pkg/fab/comm/connector.go @@ -0,0 +1,332 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package comm + +import ( + "context" + "sync" + "time" + + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" +) + +const ( + connShutdownTimeout = 50 * time.Millisecond +) + +// CachingConnector provides the ability to cache GRPC connections. +// It provides a GRPC compatible Context Dialer interface via the "DialContext" method. +// Connections provided by this component are monitored for becoming idle or entering shutdown state. +// When connections has its usages closed for longer than "idleTime", the connection is closed and removed +// from the connection cache. Callers must release connections by calling the "ReleaseConn" method. +// The Close method will flush all remaining open connections. This component should be considered +// unusable after calling Close. +// +// This component has been designed to be safe for concurrency. +type CachingConnector struct { + conns sync.Map + sweepTime time.Duration + idleTime time.Duration + index map[*grpc.ClientConn]*cachedConn + lock sync.Mutex + waitgroup sync.WaitGroup + janitorChan chan *cachedConn + janitorDone chan bool + janitorClosed chan bool +} + +type cachedConn struct { + target string + conn *grpc.ClientConn + open int + lastOpen time.Time + lastClose time.Time +} + +// NewCachingConnector creates a GRPC connection cache. The cache is governed by +// sweepTime and idleTime. +func NewCachingConnector(sweepTime time.Duration, idleTime time.Duration) *CachingConnector { + cc := CachingConnector{ + conns: sync.Map{}, + index: map[*grpc.ClientConn]*cachedConn{}, + janitorChan: make(chan *cachedConn), + janitorDone: make(chan bool), + janitorClosed: make(chan bool, 1), + sweepTime: sweepTime, + idleTime: idleTime, + } + + // cc.janitorClosed determines if a goroutine needs to be spun up. + // The janitor is able to shut itself down when it has no connection to monitor. + // When it shuts itself down, it pushes a value onto janitorClosed. We initialize + // the go chan with a bootstrap value so that cachingConnector spins up the + // goroutine on first usage. + cc.janitorClosed <- true + return &cc +} + +// Close cleans up cached connections. +func (cc *CachingConnector) Close() { + cc.lock.Lock() + defer cc.lock.Unlock() + + if cc.janitorDone != nil { + logger.Debug("closing caching GRPC connector") + + select { + case <-cc.janitorClosed: + logger.Debugf("janitor not running") + default: + logger.Debugf("janitor running") + cc.janitorDone <- true + cc.waitgroup.Wait() + } + + close(cc.janitorChan) + close(cc.janitorClosed) + close(cc.janitorDone) + cc.janitorDone = nil + } +} + +// DialContext is a wrapper for grpc.DialContext where connections are cached. +func (cc *CachingConnector) DialContext(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + logger.Debugf("DialContext: %s", target) + + c, ok := cc.loadConn(target) + if !ok { + createdConn, err := cc.createConn(ctx, target, opts...) + if err != nil { + return nil, errors.WithMessage(err, "connection creation failed") + } + c = createdConn + } + + if err := cc.openConn(ctx, c); err != nil { + return nil, errors.Errorf("dialing connection timed out [%s]", target) + } + return c.conn, nil +} + +// ReleaseConn notifies the cache that the connection is no longer in use. +func (cc *CachingConnector) ReleaseConn(conn *grpc.ClientConn) { + cc.lock.Lock() + defer cc.lock.Unlock() + + cconn, ok := cc.index[conn] + if !ok { + logger.Warnf("connection not found [%p]", conn) + return + } + logger.Debugf("ReleaseConn [%s]", cconn.target) + + if cconn.open > 0 { + cconn.lastClose = time.Now() + cconn.open-- + } + + cc.updateJanitor(cconn) +} + +func (cc *CachingConnector) loadConn(target string) (*cachedConn, bool) { + connRaw, ok := cc.conns.Load(target) + if ok { + c, ok := connRaw.(*cachedConn) + if ok { + if c.conn.GetState() != connectivity.Shutdown { + logger.Debugf("using cached connection [%s: %p]", target, c) + return c, true + } + cc.shutdownConn(c) + } + } + return nil, false +} + +func (cc *CachingConnector) createConn(ctx context.Context, target string, opts ...grpc.DialOption) (*cachedConn, error) { + cc.lock.Lock() + defer cc.lock.Unlock() + + cconn, ok := cc.loadConn(target) + if ok { + return cconn, nil + } + + logger.Debugf("creating connection [%s]", target) + conn, err := grpc.DialContext(ctx, target, opts...) + if err != nil { + return nil, errors.WithMessage(err, "dialing peer failed") + } + + logger.Debugf("storing connection [%s]", target) + cconn = &cachedConn{ + target: target, + conn: conn, + } + cc.conns.Store(target, cconn) + cc.index[conn] = cconn + + return cconn, nil +} + +func (cc *CachingConnector) openConn(ctx context.Context, c *cachedConn) error { + + err := waitConn(ctx, c.conn, connectivity.Ready) + if err != nil { + return err + } + + cc.lock.Lock() + defer cc.lock.Unlock() + c.open++ + c.lastOpen = time.Now() + cc.updateJanitor(c) + + logger.Debugf("connection was opened [%s]", c.target) + return nil +} + +func waitConn(ctx context.Context, conn *grpc.ClientConn, targetState connectivity.State) error { + for { + state := conn.GetState() + if state == targetState { + break + } + if !conn.WaitForStateChange(ctx, state) { + return errors.Wrap(ctx.Err(), "waiting for connection failed") + } + } + return nil +} + +func (cc *CachingConnector) shutdownConn(cconn *cachedConn) { + cc.lock.Lock() + defer cc.lock.Unlock() + + logger.Debugf("connection was shutdown [%s]", cconn.target) + cc.conns.Delete(cconn.target) + delete(cc.index, cconn.conn) + + cconn.open = 0 + cconn.lastClose = time.Time{} + + cc.updateJanitor(cconn) +} + +func (cc *CachingConnector) removeConn(target string) { + cc.lock.Lock() + defer cc.lock.Unlock() + + logger.Debugf("removing connection [%s]", target) + connRaw, ok := cc.conns.Load(target) + if ok { + c, ok := connRaw.(*cachedConn) + if ok { + delete(cc.index, c.conn) + cc.conns.Delete(target) + c.conn.Close() + } + } +} + +func (cc *CachingConnector) updateJanitor(c *cachedConn) { + select { + case <-cc.janitorClosed: + logger.Debugf("janitor not started") + cc.waitgroup.Add(1) + go janitor(cc.sweepTime, cc.idleTime, &cc.waitgroup, cc.janitorChan, cc.janitorClosed, cc.janitorDone, cc.removeConn) + default: + logger.Debugf("janitor already started") + } + cClone := *c + + cc.janitorChan <- &cClone +} + +// The janitor monitors open connections for shutdown state or extended non-usage. +// This component operates by running a sweep with a period determined by "sweepTime". +// When a connection returned the GRPC status connectivity.Shutdown or when the connection +// has its usages closed for longer than "idleTime", the connection is closed and the +// "connRemove" notifier is called. +// +// The caching connector: +// pushes connection information via the "conn" go channel. +// notifies the janitor of close by closing the "done" go channel. +// +// The janitor: +// calls "connRemove" callback when closing a connection. +// decrements the "wg" waitgroup when exiting. +// writes to the "done" go channel when closing due to becoming empty. + +type connRemoveNotifier func(target string) + +func janitor(sweepTime time.Duration, idleTime time.Duration, wg *sync.WaitGroup, conn chan *cachedConn, close chan bool, done chan bool, connRemove connRemoveNotifier) { + logger.Debugf("starting connection janitor") + defer wg.Done() + + conns := map[string]*cachedConn{} + ticker := time.NewTicker(sweepTime) + for { + select { + case <-done: + if len(conns) > 0 { + logger.Debugf("flushing connection janitor with open connections [%d]", len(conns)) + } else { + logger.Debugf("flushing connection janitor") + } + flush(conns) + return + case c := <-conn: + logger.Debugf("updating connection in connection janitor") + conns[c.target] = c + case <-ticker.C: + rm := sweep(conns, idleTime) + for _, target := range rm { + connRemove(target) + delete(conns, target) + } + + if len(conns) == 0 { + logger.Debugf("closing connection janitor") + close <- true + return + } + } + } +} + +func flush(conns map[string]*cachedConn) { + for _, c := range conns { + logger.Debugf("connection janitor closing connection [%s]", c.target) + closeConn(c.conn) + } +} + +func sweep(conns map[string]*cachedConn, idleTime time.Duration) []string { + rm := make([]string, 0, len(conns)) + now := time.Now() + for _, c := range conns { + if c.open == 0 && now.After(c.lastClose.Add(idleTime)) { + logger.Debugf("connection janitor closing connection [%s]", c.target) + rm = append(rm, c.target) + } else if c.conn.GetState() == connectivity.Shutdown { + logger.Debugf("connection already closed [%s]", c.target) + rm = append(rm, c.target) + } + } + return rm +} + +func closeConn(conn *grpc.ClientConn) { + conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), connShutdownTimeout) + waitConn(ctx, conn, connectivity.Shutdown) + cancel() +} diff --git a/pkg/fab/comm/connector_test.go b/pkg/fab/comm/connector_test.go new file mode 100644 index 0000000000..d92a010013 --- /dev/null +++ b/pkg/fab/comm/connector_test.go @@ -0,0 +1,211 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package comm + +import ( + "context" + "math/rand" + "sync" + "testing" + "time" + "unsafe" + + pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" +) + +const ( + normalTimeout = 5 * time.Second + + normalSweepTime = 5 * time.Second + normalIdleTime = 10 * time.Second + shortSweepTime = 100 * time.Millisecond + shortIdleTime = 200 * time.Millisecond + shortSleepTime = 1000 +) + +func TestConnectorHappyPath(t *testing.T) { + connector := NewCachingConnector(normalSweepTime, normalIdleTime) + defer connector.Close() + + ctx, cancel := context.WithTimeout(context.Background(), normalTimeout) + conn1, err := connector.DialContext(ctx, endorserAddr[0], grpc.WithInsecure()) + cancel() + assert.Nil(t, err, "DialContext should have succeeded") + + assert.NotEqual(t, connectivity.Connecting, conn1.GetState(), "connection should not be connecting") + assert.NotEqual(t, connectivity.Shutdown, conn1.GetState(), "connection should not be shutdown") + + ctx, cancel = context.WithTimeout(context.Background(), normalTimeout) + conn2, err := connector.DialContext(ctx, endorserAddr[0], grpc.WithInsecure()) + cancel() + assert.Nil(t, err, "DialContext should have succeeded") + assert.Equal(t, unsafe.Pointer(conn1), unsafe.Pointer(conn2), "connections should match") + + ctx, cancel = context.WithTimeout(context.Background(), normalTimeout) + conn3, err := connector.DialContext(ctx, endorserAddr[1], grpc.WithInsecure()) + cancel() + + assert.NotEqual(t, connectivity.Connecting, conn3.GetState(), "connection should not be connecting") + assert.NotEqual(t, connectivity.Shutdown, conn3.GetState(), "connection should not be shutdown") + + assert.Nil(t, err, "DialContext should have succeeded") + assert.NotEqual(t, unsafe.Pointer(conn1), unsafe.Pointer(conn3), "connections should not match") +} + +func TestConnectorDoubleClose(t *testing.T) { + connector := NewCachingConnector(normalSweepTime, normalIdleTime) + defer connector.Close() + connector.Close() +} + +func TestConnectorHappyFlushNumber1(t *testing.T) { + connector := NewCachingConnector(normalSweepTime, normalIdleTime) + defer connector.Close() + + ctx, cancel := context.WithTimeout(context.Background(), normalTimeout) + conn1, err := connector.DialContext(ctx, endorserAddr[0], grpc.WithInsecure()) + cancel() + assert.Nil(t, err, "DialContext should have succeeded") + + connector.Close() + assert.Equal(t, connectivity.Shutdown, conn1.GetState(), "connection should be shutdown") +} + +func TestConnectorHappyFlushNumber2(t *testing.T) { + connector := NewCachingConnector(normalSweepTime, normalIdleTime) + defer connector.Close() + + ctx, cancel := context.WithTimeout(context.Background(), normalTimeout) + conn1, err := connector.DialContext(ctx, endorserAddr[0], grpc.WithInsecure()) + cancel() + assert.Nil(t, err, "DialContext should have succeeded") + + ctx, cancel = context.WithTimeout(context.Background(), normalTimeout) + conn2, err := connector.DialContext(ctx, endorserAddr[0], grpc.WithInsecure()) + cancel() + assert.Nil(t, err, "DialContext should have succeeded") + + ctx, cancel = context.WithTimeout(context.Background(), normalTimeout) + conn3, err := connector.DialContext(ctx, endorserAddr[1], grpc.WithInsecure()) + cancel() + assert.Nil(t, err, "DialContext should have succeeded") + + connector.Close() + assert.Equal(t, connectivity.Shutdown, conn1.GetState(), "connection should be shutdown") + assert.Equal(t, connectivity.Shutdown, conn2.GetState(), "connection should be shutdown") + assert.Equal(t, connectivity.Shutdown, conn3.GetState(), "connection should be shutdown") +} + +func TestConnectorShouldJanitorRestart(t *testing.T) { + connector := NewCachingConnector(shortSweepTime, shortIdleTime) + defer connector.Close() + + ctx, cancel := context.WithTimeout(context.Background(), normalTimeout) + conn1, err := connector.DialContext(ctx, endorserAddr[0], grpc.WithInsecure()) + cancel() + assert.Nil(t, err, "DialContext should have succeeded") + + connector.ReleaseConn(conn1) + time.Sleep(shortIdleTime * 3) + + ctx, cancel = context.WithTimeout(context.Background(), normalTimeout) + conn2, err := connector.DialContext(ctx, endorserAddr[0], grpc.WithInsecure()) + cancel() + assert.Nil(t, err, "DialContext should have succeeded") + + assert.NotEqual(t, unsafe.Pointer(conn1), unsafe.Pointer(conn2), "connections should be different due to disconnect") +} + +func TestConnectorShouldSweep(t *testing.T) { + connector := NewCachingConnector(shortSweepTime, shortIdleTime) + defer connector.Close() + + ctx, cancel := context.WithTimeout(context.Background(), normalTimeout) + conn1, err := connector.DialContext(ctx, endorserAddr[0], grpc.WithInsecure()) + cancel() + assert.Nil(t, err, "DialContext should have succeeded") + + ctx, cancel = context.WithTimeout(context.Background(), normalTimeout) + conn3, err := connector.DialContext(ctx, endorserAddr[1], grpc.WithInsecure()) + cancel() + assert.Nil(t, err, "DialContext should have succeeded") + + connector.ReleaseConn(conn1) + time.Sleep(shortIdleTime * 3) + assert.Equal(t, connectivity.Shutdown, conn1.GetState(), "connection should be shutdown") + assert.NotEqual(t, connectivity.Shutdown, conn3.GetState(), "connection should not be shutdown") + + ctx, cancel = context.WithTimeout(context.Background(), normalTimeout) + conn4, err := connector.DialContext(ctx, endorserAddr[0], grpc.WithInsecure()) + cancel() + assert.Nil(t, err, "DialContext should have succeeded") + + assert.NotEqual(t, unsafe.Pointer(conn1), unsafe.Pointer(conn4), "connections should be different due to disconnect") +} + +func TestConnectorConcurrent(t *testing.T) { + const goroutines = 50 + + connector := NewCachingConnector(shortSweepTime, shortIdleTime) + defer connector.Close() + + wg := sync.WaitGroup{} + + // Test immediate release + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go testDial(t, &wg, connector, endorserAddr[i%2], 0, 1) + } + wg.Wait() + + // Test long intervals for releasing connection + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go testDial(t, &wg, connector, endorserAddr[i%2], shortSleepTime*3, 1) + } + wg.Wait() + + // Test mixed intervals for releasing connection + wg.Add(goroutines) + for i := 0; i < goroutines/2; i++ { + go testDial(t, &wg, connector, endorserAddr[0], 0, 1) + go testDial(t, &wg, connector, endorserAddr[1], shortSleepTime*3, 1) + } + wg.Wait() + + // Test random intervals for releasing connection + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go testDial(t, &wg, connector, endorserAddr[i%2], 0, shortSleepTime*3) + } + wg.Wait() +} + +func testDial(t *testing.T, wg *sync.WaitGroup, connector *CachingConnector, addr string, minSleepBeforeRelease int, maxSleepBeforeRelease int) { + defer wg.Done() + + ctx, cancel := context.WithTimeout(context.Background(), normalTimeout) + conn, err := connector.DialContext(ctx, addr, grpc.WithInsecure()) + cancel() + assert.Nil(t, err, "DialContext should have succeeded") + + endorserClient := pb.NewEndorserClient(conn) + ctx, cancel = context.WithTimeout(context.Background(), normalTimeout) + proposal := pb.SignedProposal{} + resp, err := endorserClient.ProcessProposal(context.Background(), &proposal) + cancel() + + assert.Nil(t, err, "peer process proposal should not have error") + assert.Equal(t, int32(200), resp.GetResponse().Status) + + randomSleep := rand.Intn(maxSleepBeforeRelease) + time.Sleep(time.Duration(minSleepBeforeRelease)*time.Millisecond + time.Duration(randomSleep)*time.Millisecond) + connector.ReleaseConn(conn) +} diff --git a/pkg/fab/peer/peer.go b/pkg/fab/peer/peer.go index 240a43d540..9f48c905a0 100644 --- a/pkg/fab/peer/peer.go +++ b/pkg/fab/peer/peer.go @@ -7,26 +7,34 @@ SPDX-License-Identifier: Apache-2.0 package peer import ( + "context" "encoding/pem" "fmt" "crypto/x509" + "github.com/spf13/cast" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/core" "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" "github.com/hyperledger/fabric-sdk-go/pkg/core/config/urlutil" "github.com/hyperledger/fabric-sdk-go/pkg/errors/status" "github.com/hyperledger/fabric-sdk-go/pkg/logging" - "github.com/spf13/cast" - "google.golang.org/grpc/keepalive" ) var logger = logging.NewLogger("fabric_sdk_go") const ( - connBlocking = true + connBlocking = false ) +type connProvider interface { + DialContext(ctx context.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) + ReleaseConn(conn *grpc.ClientConn) +} + // Peer represents a node in the target blockchain network to which // HFC sends endorsement proposals, transaction ordering or query requests. type Peer struct { @@ -42,6 +50,7 @@ type Peer struct { kap keepalive.ClientParameters failFast bool inSecure bool + connector connProvider } // Option describes a functional parameter for the New constructor @@ -49,7 +58,10 @@ type Option func(*Peer) error // New Returns a new Peer instance func New(config core.Config, opts ...Option) (*Peer, error) { - peer := &Peer{config: config} + peer := &Peer{ + config: config, + connector: &defConnector{}, + } var err error for _, opt := range opts { @@ -71,6 +83,7 @@ func New(config core.Config, opts ...Option) (*Peer, error) { kap: peer.kap, failFast: peer.failFast, allowInsecure: peer.inSecure, + connector: peer.connector, } peer.processor, err = newPeerEndorser(&endorseRequest) @@ -197,6 +210,15 @@ func WithPeerProcessor(processor fab.ProposalProcessor) Option { } } +// WithConnProvider allows a custom GRPC connection provider to be used. +func WithConnProvider(provider connProvider) Option { + return func(p *Peer) error { + p.connector = provider + + return nil + } +} + // Name gets the Peer name. func (p *Peer) Name() string { return p.name @@ -267,3 +289,14 @@ func PeersToTxnProcessors(peers []fab.Peer) []fab.ProposalProcessor { } return tpp } + +type defConnector struct{} + +func (*defConnector) DialContext(ctx context.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { + opts = append(opts, grpc.WithBlock()) + return grpc.DialContext(ctx, target, opts...) +} + +func (*defConnector) ReleaseConn(conn *grpc.ClientConn) { + conn.Close() +} diff --git a/pkg/fab/peer/peerendorser.go b/pkg/fab/peer/peerendorser.go index dc75779daf..07efc4fde8 100644 --- a/pkg/fab/peer/peerendorser.go +++ b/pkg/fab/peer/peerendorser.go @@ -34,6 +34,7 @@ type peerEndorser struct { transportCredentials credentials.TransportCredentials secured bool allowInsecure bool + connector connProvider } type peerEndorserRequest struct { @@ -45,6 +46,7 @@ type peerEndorserRequest struct { kap keepalive.ClientParameters failFast bool allowInsecure bool + connector connProvider } func newPeerEndorser(endorseReq *peerEndorserRequest) (*peerEndorser, error) { @@ -72,7 +74,7 @@ func newPeerEndorser(endorseReq *peerEndorserRequest) (*peerEndorser, error) { pc := &peerEndorser{grpcDialOption: opts, target: urlutil.ToAddress(endorseReq.target), dialTimeout: timeout, transportCredentials: credentials.NewTLS(tlsConfig), secured: urlutil.AttemptSecured(endorseReq.target), - allowInsecure: endorseReq.allowInsecure} + allowInsecure: endorseReq.allowInsecure, connector: endorseReq.connector} return pc, nil } @@ -108,11 +110,11 @@ func (p *peerEndorser) conn(secured bool) (*grpc.ClientConn, error) { ctx, cancel := grpccontext.WithTimeout(ctx, p.dialTimeout) defer cancel() - return grpc.DialContext(ctx, p.target, grpcOpts...) + return p.connector.DialContext(ctx, p.target, grpcOpts...) } func (p *peerEndorser) releaseConn(conn *grpc.ClientConn) { - conn.Close() + p.connector.ReleaseConn(conn) } func (p *peerEndorser) sendProposal(proposal fab.ProcessProposalRequest, secured bool) (*pb.ProposalResponse, error) { diff --git a/pkg/fab/peer/peerendorser_test.go b/pkg/fab/peer/peerendorser_test.go index 0d2169eb44..036d8bac28 100644 --- a/pkg/fab/peer/peerendorser_test.go +++ b/pkg/fab/peer/peerendorser_test.go @@ -304,6 +304,7 @@ func getPeerEndorserRequest(url string, cert *x509.Certificate, serverHostOverri kap: kap, failFast: false, allowInsecure: allowInsecure, + connector: &defConnector{}, } } diff --git a/pkg/fabsdk/fabsdk.go b/pkg/fabsdk/fabsdk.go index 708c16f3fa..b039306aac 100644 --- a/pkg/fabsdk/fabsdk.go +++ b/pkg/fabsdk/fabsdk.go @@ -241,7 +241,7 @@ func initSDK(sdk *FabricSDK, opts []Option) error { // Close frees up caches and connections being maintained by the SDK func (sdk *FabricSDK) Close() { - // TODO: upcoming changes will have Close funcs being called from here. + sdk.fabricProvider.Close() } // Config returns the SDK's configuration. @@ -272,7 +272,7 @@ func (sdk *FabricSDK) context() context.Providers { context.WithFabricProvider(sdk.fabricProvider), context.WithChannelProvider(sdk.channelProvider)) c := context.SDKContext{ - *fabContext, + FabContext: *fabContext, } return &c } diff --git a/pkg/fabsdk/fabsdk_test.go b/pkg/fabsdk/fabsdk_test.go index fc68f277ae..a6fbb38681 100644 --- a/pkg/fabsdk/fabsdk_test.go +++ b/pkg/fabsdk/fabsdk_test.go @@ -39,12 +39,11 @@ func goodOpt() Option { } func TestNewBadOpt(t *testing.T) { - sdk, err := New(configImpl.FromFile(sdkConfigFile), + _, err := New(configImpl.FromFile(sdkConfigFile), badOpt()) if err == nil { t.Fatalf("Expected error from New") } - sdk.Close() } func badOpt() Option { diff --git a/pkg/fabsdk/provider/fabpvdr/fabpvdr.go b/pkg/fabsdk/provider/fabpvdr/fabpvdr.go index 59d3a1d2b2..d83f3efd22 100644 --- a/pkg/fabsdk/provider/fabpvdr/fabpvdr.go +++ b/pkg/fabsdk/provider/fabpvdr/fabpvdr.go @@ -13,6 +13,7 @@ import ( channelImpl "github.com/hyperledger/fabric-sdk-go/pkg/fab/channel" "github.com/hyperledger/fabric-sdk-go/pkg/fab/channel/membership" "github.com/hyperledger/fabric-sdk-go/pkg/fab/chconfig" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/comm" "github.com/hyperledger/fabric-sdk-go/pkg/fab/events" "github.com/hyperledger/fabric-sdk-go/pkg/fab/orderer" peerImpl "github.com/hyperledger/fabric-sdk-go/pkg/fab/peer" @@ -24,6 +25,7 @@ import ( // FabricProvider represents the default implementation of Fabric objects. type FabricProvider struct { providerContext core.Providers + connector *comm.CachingConnector } type fabContext struct { @@ -33,12 +35,23 @@ type fabContext struct { // New creates a FabricProvider enabling access to core Fabric objects and functionality. func New(ctx core.Providers) *FabricProvider { + idleTime := ctx.Config().TimeoutOrDefault(core.ConnectionIdle) + sweepTime := ctx.Config().TimeoutOrDefault(core.CacheSweepInterval) + + cc := comm.NewCachingConnector(sweepTime, idleTime) + f := FabricProvider{ providerContext: ctx, + connector: cc, } return &f } +// Close frees resources and caches. +func (f *FabricProvider) Close() { + f.connector.Close() +} + // CreateResourceClient returns a new client initialized for the current instance of the SDK. func (f *FabricProvider) CreateResourceClient(ic fab.IdentityContext) (api.Resource, error) { ctx := &fabContext{ @@ -120,7 +133,7 @@ func (f *FabricProvider) CreateChannelTransactor(ic fab.IdentityContext, cfg fab // CreatePeerFromConfig returns a new default implementation of Peer based configuration func (f *FabricProvider) CreatePeerFromConfig(peerCfg *core.NetworkPeer) (fab.Peer, error) { - return peerImpl.New(f.providerContext.Config(), peerImpl.FromPeerConfig(peerCfg)) + return peerImpl.New(f.providerContext.Config(), peerImpl.FromPeerConfig(peerCfg), peerImpl.WithConnProvider(f.connector)) } // CreateOrdererFromConfig creates a default implementation of Orderer based on configuration. diff --git a/test/fixtures/config/config_pkcs11_test.yaml b/test/fixtures/config/config_pkcs11_test.yaml index b0c2b0d145..024db3090c 100755 --- a/test/fixtures/config/config_pkcs11_test.yaml +++ b/test/fixtures/config/config_pkcs11_test.yaml @@ -62,6 +62,9 @@ client: timeout: connection: 3s response: 5s + cache: + timeout: + connectionIdle: 30s # Some SDKs support pluggable KV stores, the properties under "credentialStore" diff --git a/test/fixtures/config/config_test.yaml b/test/fixtures/config/config_test.yaml index a4e4e373c1..b36d431e96 100755 --- a/test/fixtures/config/config_test.yaml +++ b/test/fixtures/config/config_test.yaml @@ -63,6 +63,10 @@ client: timeout: connection: 3s response: 5s + cache: + timeout: + connectionIdle: 30s + # Root of the MSP directories with keys and certs. cryptoconfig: