From 8f5d6f4978a22fee1dc815141c586854e15b5795 Mon Sep 17 00:00:00 2001 From: Bob Stasyszyn Date: Tue, 13 Mar 2018 09:30:07 -0400 Subject: [PATCH] [FAB-8839] Use connection cache in event client The deliver and event hub client now use the connection cache. Change-Id: Iab7906eb006de4a27e97ab5de386077aa71ab99e Signed-off-by: Bob Stasyszyn --- pkg/fab/comm/connection.go | 29 ++++++----- pkg/fab/comm/connection_test.go | 10 ++-- pkg/fab/comm/mocks.go | 48 +++++++++++++++++++ .../connection/connection_test.go | 4 +- .../connection/connection_test.go | 8 ++-- pkg/fabsdk/provider/fabpvdr/fabpvdr.go | 7 ++- 6 files changed, 84 insertions(+), 22 deletions(-) create mode 100644 pkg/fab/comm/mocks.go diff --git a/pkg/fab/comm/connection.go b/pkg/fab/comm/connection.go index fae15f309b..8a170ba944 100755 --- a/pkg/fab/comm/connection.go +++ b/pkg/fab/comm/connection.go @@ -7,13 +7,14 @@ SPDX-License-Identifier: Apache-2.0 package comm import ( - "context" + reqContext "context" "sync/atomic" "github.com/pkg/errors" fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context" "github.com/hyperledger/fabric-sdk-go/pkg/common/options" + "github.com/hyperledger/fabric-sdk-go/pkg/context" "github.com/hyperledger/fabric-sdk-go/pkg/context/api/core" "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" "github.com/hyperledger/fabric-sdk-go/pkg/core/config/comm" @@ -34,6 +35,7 @@ type GRPCConnection struct { chConfig fab.ChannelCfg conn *grpc.ClientConn stream grpc.ClientStream + commManager fab.CommManager tlsCertHash []byte done int32 } @@ -52,20 +54,22 @@ func NewConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvide return nil, err } - grpcctx := context.Background() - grpcctx, cancel := context.WithTimeout(grpcctx, params.connectTimeout) + reqCtx, cancel := reqContext.WithTimeout(context.NewRequest(ctx), params.connectTimeout) defer cancel() - grpcconn, err := grpc.DialContext(grpcctx, endpoint.ToAddress(url), dialOpts...) + commManager, ok := context.RequestCommManager(reqCtx) + if !ok { + return nil, errors.New("unable to get comm manager") + } + + grpcconn, err := commManager.DialContext(reqCtx, endpoint.ToAddress(url), dialOpts...) if err != nil { return nil, errors.Wrapf(err, "could not connect to %s", url) } stream, err := streamProvider(grpcconn) if err != nil { - if closeErr := grpcconn.Close(); closeErr != nil { - logger.Warnf("error closing GRPC connection: %s", closeErr) - } + commManager.ReleaseConn(grpcconn) return nil, errors.Wrapf(err, "could not create stream to %s", url) } @@ -76,6 +80,7 @@ func NewConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvide return &GRPCConnection{ context: ctx, chConfig: chConfig, + commManager: commManager, conn: grpcconn, stream: stream, tlsCertHash: comm.TLSCertHash(ctx.Config()), @@ -94,15 +99,15 @@ func (c *GRPCConnection) Close() { return } - logger.Debugf("Closing stream....") + logger.Debug("Closing stream....") if err := c.stream.CloseSend(); err != nil { logger.Warnf("error closing GRPC stream: %s", err) } - logger.Debugf("Closing connection....") - if err := c.conn.Close(); err != nil { - logger.Warnf("error closing GRPC connection: %s", err) - } + logger.Debug("Releasing connection....") + c.commManager.ReleaseConn(c.conn) + + logger.Debug("... connection successfully closed.") } // Closed returns true if the connection has been closed diff --git a/pkg/fab/comm/connection_test.go b/pkg/fab/comm/connection_test.go index c06ce86e45..9637ccedfb 100755 --- a/pkg/fab/comm/connection_test.go +++ b/pkg/fab/comm/connection_test.go @@ -13,7 +13,6 @@ import ( "google.golang.org/grpc/keepalive" - fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context" eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/mocks" fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" @@ -43,6 +42,7 @@ func TestConnection(t *testing.T) { _, err = NewConnection(context, chConfig, testStream, "invalidhost:0000", WithFailFast(true), WithCertificate(nil), + WithInsecure(), WithHostOverride(""), WithKeepAliveParams(keepalive.ClientParameters{}), WithConnectTimeout(3*time.Second), @@ -80,10 +80,12 @@ func TestConnection(t *testing.T) { conn.Close() } -// Use the Deliver server for testing +// Use the Event Hub server for testing var testServer *eventmocks.MockEventhubServer var endorserAddr []string -func newMockContext() fabcontext.Client { - return fabmocks.NewMockContext(fabmocks.NewMockUser("test")) +func newMockContext() *fabmocks.MockContext { + context := fabmocks.NewMockContext(fabmocks.NewMockUser("test")) + context.SetCustomInfraProvider(NewMockInfraProvider()) + return context } diff --git a/pkg/fab/comm/mocks.go b/pkg/fab/comm/mocks.go new file mode 100644 index 0000000000..36dbd63c74 --- /dev/null +++ b/pkg/fab/comm/mocks.go @@ -0,0 +1,48 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package comm + +import ( + "context" + + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" + fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" + "google.golang.org/grpc" +) + +// MockCommManager is a non-caching comm manager used +// for unit testing +type MockCommManager struct { +} + +// DialContext creates a connection +func (m *MockCommManager) DialContext(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + return grpc.DialContext(ctx, target, opts...) +} + +// ReleaseConn closes the connection +func (m *MockCommManager) ReleaseConn(conn *grpc.ClientConn) { + if err := conn.Close(); err != nil { + logger.Warnf("Error closing connection: %s", err) + } +} + +// MockInfraProvider overrides the comm manager to return +// the MockCommManager +type MockInfraProvider struct { + fabmocks.MockInfraProvider +} + +// NewMockInfraProvider return a new MockInfraProvider +func NewMockInfraProvider() *MockInfraProvider { + return &MockInfraProvider{} +} + +// CommManager returns the MockCommManager +func (f *MockInfraProvider) CommManager() fab.CommManager { + return &MockCommManager{} +} diff --git a/pkg/fab/events/deliverclient/connection/connection_test.go b/pkg/fab/events/deliverclient/connection/connection_test.go index 33b88cee2e..edc6b680ac 100755 --- a/pkg/fab/events/deliverclient/connection/connection_test.go +++ b/pkg/fab/events/deliverclient/connection/connection_test.go @@ -220,5 +220,7 @@ func TestMain(m *testing.M) { } func newMockContext() *fabmocks.MockContext { - return fabmocks.NewMockContext(fabmocks.NewMockUser("user1")) + context := fabmocks.NewMockContext(fabmocks.NewMockUser("user1")) + context.SetCustomInfraProvider(comm.NewMockInfraProvider()) + return context } diff --git a/pkg/fab/events/eventhubclient/connection/connection_test.go b/pkg/fab/events/eventhubclient/connection/connection_test.go index e9d4d9f963..577369d850 100755 --- a/pkg/fab/events/eventhubclient/connection/connection_test.go +++ b/pkg/fab/events/eventhubclient/connection/connection_test.go @@ -13,8 +13,8 @@ import ( "testing" "time" - fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context" "github.com/hyperledger/fabric-sdk-go/pkg/context/api/core" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/comm" clientdisp "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher" eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/mocks" fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" @@ -198,6 +198,8 @@ func newPeerConfig(eventURL string) *core.PeerConfig { } } -func newMockContext() fabcontext.Client { - return fabmocks.NewMockContext(fabmocks.NewMockUser("user1")) +func newMockContext() *fabmocks.MockContext { + context := fabmocks.NewMockContext(fabmocks.NewMockUser("user1")) + context.SetCustomInfraProvider(comm.NewMockInfraProvider()) + return context } diff --git a/pkg/fabsdk/provider/fabpvdr/fabpvdr.go b/pkg/fabsdk/provider/fabpvdr/fabpvdr.go index 4d4d3247af..a2738b1fd5 100644 --- a/pkg/fabsdk/provider/fabpvdr/fabpvdr.go +++ b/pkg/fabsdk/provider/fabpvdr/fabpvdr.go @@ -83,10 +83,13 @@ func (f *InfraProvider) Initialize(providers context.Providers) error { // Close frees resources and caches. func (f *InfraProvider) Close() { - logger.Debug("Closing comm manager...") - f.commManager.Close() logger.Debug("Closing event service cache...") f.eventServiceCache.Close() + + // Comm Manager must be closed last since other resources + // may still be using it. + logger.Debug("Closing comm manager...") + f.commManager.Close() } // CommManager provides comm support such as GRPC onnections