From 96a926daf72b78fe78871b0d8efe0cca59bc85a9 Mon Sep 17 00:00:00 2001 From: Troy Ronda Date: Sun, 11 Mar 2018 19:28:28 -0400 Subject: [PATCH] [FAB-8782] Resolve linter in peer/ord/comm This change resolve gometalinter warnings in: pkg/fab/peer pkg/fab/orderer pkg/fab/comm Change-Id: I390f5d22f0f78e2e0a9109e8796cef50ea36bdc0 Signed-off-by: Troy Ronda --- pkg/fab/comm/connection.go | 4 +- pkg/fab/comm/connection_test.go | 16 +--- pkg/fab/comm/connector.go | 18 ++++- pkg/fab/comm/connector_test.go | 4 +- pkg/fab/orderer/orderer.go | 123 ++++++++++++++++-------------- pkg/fab/orderer/orderer_test.go | 16 ---- pkg/fab/peer/peer.go | 11 +-- pkg/fab/peer/peerendorser.go | 1 - pkg/fab/peer/peerendorser_test.go | 2 - 9 files changed, 94 insertions(+), 101 deletions(-) diff --git a/pkg/fab/comm/connection.go b/pkg/fab/comm/connection.go index 367d1f0742..8c6fe7f93b 100755 --- a/pkg/fab/comm/connection.go +++ b/pkg/fab/comm/connection.go @@ -63,8 +63,8 @@ func NewConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvide stream, err := streamProvider(grpcconn) if err != nil { - if err := grpcconn.Close(); err != nil { - logger.Warnf("error closing GRPC connection: %s", err) + if closeErr := grpcconn.Close(); err != nil { + logger.Warnf("error closing GRPC connection: %s", closeErr) } return nil, errors.Wrapf(err, "could not create stream to %s", url) } diff --git a/pkg/fab/comm/connection_test.go b/pkg/fab/comm/connection_test.go index 7f6b63a71d..c06ce86e45 100755 --- a/pkg/fab/comm/connection_test.go +++ b/pkg/fab/comm/connection_test.go @@ -14,7 +14,6 @@ import ( "google.golang.org/grpc/keepalive" fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context" - "github.com/hyperledger/fabric-sdk-go/pkg/context/api/core" eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/mocks" fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" @@ -37,11 +36,11 @@ func TestConnection(t *testing.T) { context := newMockContext() chConfig := fabmocks.NewMockChannelCfg(channelID) - conn, err := NewConnection(context, chConfig, testStream, "") + _, err := NewConnection(context, chConfig, testStream, "") if err == nil { t.Fatalf("expected error creating new connection with empty URL") } - conn, err = NewConnection(context, chConfig, testStream, "invalidhost:0000", + _, err = NewConnection(context, chConfig, testStream, "invalidhost:0000", WithFailFast(true), WithCertificate(nil), WithHostOverride(""), @@ -51,12 +50,12 @@ func TestConnection(t *testing.T) { if err == nil { t.Fatalf("expected error creating new connection with invalid URL") } - conn, err = NewConnection(context, chConfig, invalidStream, peerURL) + _, err = NewConnection(context, chConfig, invalidStream, peerURL) if err == nil { t.Fatalf("expected error creating new connection with invalid stream but got none") } - conn, err = NewConnection(context, chConfig, testStream, peerURL) + conn, err := NewConnection(context, chConfig, testStream, peerURL) if err != nil { t.Fatalf("error creating new connection: %s", err) } @@ -85,13 +84,6 @@ func TestConnection(t *testing.T) { var testServer *eventmocks.MockEventhubServer var endorserAddr []string -func newPeerConfig(peerURL string) *core.PeerConfig { - return &core.PeerConfig{ - URL: peerURL, - GRPCOptions: make(map[string]interface{}), - } -} - func newMockContext() fabcontext.Client { return fabmocks.NewMockContext(fabmocks.NewMockUser("test")) } diff --git a/pkg/fab/comm/connector.go b/pkg/fab/comm/connector.go index 24503fc47b..791148eacc 100644 --- a/pkg/fab/comm/connector.go +++ b/pkg/fab/comm/connector.go @@ -230,7 +230,9 @@ func (cc *CachingConnector) removeConn(target string) { if ok { delete(cc.index, c.conn) cc.conns.Delete(target) - c.conn.Close() + if err := c.conn.Close(); err != nil { + logger.Debugf("unable to close connection [%s]", err) + } } } } @@ -316,7 +318,11 @@ func cache(conns map[string]*cachedConn, updateConn *cachedConn) { logger.Debugf("new connection in connection janitor") } else if c.conn != updateConn.conn { logger.Debugf("connection change in connection janitor") - c.conn.Close() // Not blocking + + if err := c.conn.Close(); err != nil { + logger.Debugf("unable to close connection [%s]", err) + } + } else { logger.Debugf("updating existing connection in connection janitor") } @@ -347,9 +353,13 @@ func sweep(conns map[string]*cachedConn, idleTime time.Duration) []string { } func closeConn(conn *grpc.ClientConn) { - conn.Close() + if err := conn.Close(); err != nil { + logger.Debugf("unable to close connection [%s]", err) + } ctx, cancel := context.WithTimeout(context.Background(), connShutdownTimeout) - waitConn(ctx, conn, connectivity.Shutdown) + if err := waitConn(ctx, conn, connectivity.Shutdown); err != nil { + logger.Debugf("unable to wait for connection close [%s]", err) + } cancel() } diff --git a/pkg/fab/comm/connector_test.go b/pkg/fab/comm/connector_test.go index d92a010013..daa5333d1b 100644 --- a/pkg/fab/comm/connector_test.go +++ b/pkg/fab/comm/connector_test.go @@ -195,17 +195,15 @@ func testDial(t *testing.T, wg *sync.WaitGroup, connector *CachingConnector, add conn, err := connector.DialContext(ctx, addr, grpc.WithInsecure()) cancel() assert.Nil(t, err, "DialContext should have succeeded") + defer connector.ReleaseConn(conn) endorserClient := pb.NewEndorserClient(conn) - ctx, cancel = context.WithTimeout(context.Background(), normalTimeout) proposal := pb.SignedProposal{} resp, err := endorserClient.ProcessProposal(context.Background(), &proposal) - cancel() assert.Nil(t, err, "peer process proposal should not have error") assert.Equal(t, int32(200), resp.GetResponse().Status) randomSleep := rand.Intn(maxSleepBeforeRelease) time.Sleep(time.Duration(minSleepBeforeRelease)*time.Millisecond + time.Duration(randomSleep)*time.Millisecond) - connector.ReleaseConn(conn) } diff --git a/pkg/fab/orderer/orderer.go b/pkg/fab/orderer/orderer.go index bedd754a50..7db98bd41e 100644 --- a/pkg/fab/orderer/orderer.go +++ b/pkg/fab/orderer/orderer.go @@ -33,17 +33,16 @@ var logger = logging.NewLogger("fabsdk/fab") // Orderer allows a client to broadcast a transaction. type Orderer struct { - config core.Config - url string - tlsCACert *x509.Certificate - serverName string - grpcDialOption []grpc.DialOption - kap keepalive.ClientParameters - dialTimeout time.Duration - failFast bool - transportCredentials credentials.TransportCredentials - allowInsecure bool - commManager fab.CommManager + config core.Config + url string + serverName string + tlsCACert *x509.Certificate + grpcDialOption []grpc.DialOption + kap keepalive.ClientParameters + dialTimeout time.Duration + failFast bool + allowInsecure bool + commManager fab.CommManager } // Option describes a functional parameter for the New constructor @@ -245,7 +244,7 @@ func (o *Orderer) SendBroadcast(ctx reqContext.Context, envelope *fab.SignedEnve } defer o.releaseConn(ctx, conn) - broadcastStream, err := ab.NewAtomicBroadcastClient(conn).Broadcast(ctx) + broadcastClient, err := ab.NewAtomicBroadcastClient(conn).Broadcast(ctx) if err != nil { rpcStatus, ok := grpcstatus.FromError(err) if ok { @@ -253,44 +252,50 @@ func (o *Orderer) SendBroadcast(ctx reqContext.Context, envelope *fab.SignedEnve } return nil, errors.Wrap(err, "NewAtomicBroadcastClient failed") } - done := make(chan bool) - var broadcastErr error - var broadcastStatus *common.Status - go func() { - for { - broadcastResponse, err := broadcastStream.Recv() - logger.Debugf("Orderer.broadcastStream - response:%v, error:%v\n", broadcastResponse, err) - if err != nil { - rpcStatus, ok := grpcstatus.FromError(err) - if ok { - err = status.NewFromGRPCStatus(rpcStatus) - } - broadcastErr = errors.Wrap(err, "broadcast recv failed") - done <- true - return - } - broadcastStatus = &broadcastResponse.Status - if broadcastResponse.Status == common.Status_SUCCESS { - done <- true - return - } - if broadcastResponse.Status != common.Status_SUCCESS { - broadcastErr = status.New(status.OrdererServerStatus, int32(broadcastResponse.Status), broadcastResponse.Info, nil) - done <- true - return - } - } - }() - if err := broadcastStream.Send(&common.Envelope{ + responses := make(chan common.Status) + errs := make(chan error, 1) + + go broadcastStream(broadcastClient, responses, errs) + + err = broadcastClient.Send(&common.Envelope{ Payload: envelope.Payload, Signature: envelope.Signature, - }); err != nil { + }) + if err != nil { return nil, errors.Wrap(err, "failed to send envelope to orderer") } - broadcastStream.CloseSend() - <-done - return broadcastStatus, broadcastErr + if err = broadcastClient.CloseSend(); err != nil { + logger.Debugf("unable to close broadcast client [%s]", err) + } + + select { + case broadcastStatus := <-responses: + return &broadcastStatus, nil + case broadcastErr := <-errs: + return nil, broadcastErr + } +} + +func broadcastStream(broadcastClient ab.AtomicBroadcast_BroadcastClient, responses chan common.Status, errs chan error) { + + broadcastResponse, err := broadcastClient.Recv() + logger.Debugf("Orderer.broadcastStream - response:%v, error:%v", broadcastResponse, err) + if err != nil { + rpcStatus, ok := grpcstatus.FromError(err) + if ok { + err = status.NewFromGRPCStatus(rpcStatus) + } + errs <- errors.Wrap(err, "broadcast recv failed") + return + } + + if broadcastResponse.Status != common.Status_SUCCESS { + errs <- status.New(status.OrdererServerStatus, int32(broadcastResponse.Status), broadcastResponse.Info, nil) + return + } + + responses <- broadcastResponse.Status } // SendDeliver sends a deliver request to the ordering service and returns the @@ -314,7 +319,7 @@ func (o *Orderer) SendDeliver(ctx reqContext.Context, envelope *fab.SignedEnvelo } // Create atomic broadcast client - broadcastStream, err := ab.NewAtomicBroadcastClient(conn).Deliver(ctx) + broadcastClient, err := ab.NewAtomicBroadcastClient(conn).Deliver(ctx) if err != nil { logger.Errorf("deliver failed [%s]", err) o.releaseConn(ctx, conn) @@ -325,29 +330,33 @@ func (o *Orderer) SendDeliver(ctx reqContext.Context, envelope *fab.SignedEnvelo // Receive blocks from the GRPC stream and put them on the channel go func() { - blockStream(broadcastStream, responses, errs) + blockStream(broadcastClient, responses, errs) o.releaseConn(ctx, conn) }() // Send block request envelope logger.Debugf("Requesting blocks from ordering service") - if err := broadcastStream.Send(&common.Envelope{ + err = broadcastClient.Send(&common.Envelope{ Payload: envelope.Payload, Signature: envelope.Signature, - }); err != nil { + }) + if err != nil { o.releaseConn(ctx, conn) errs <- errors.Wrap(err, "failed to send block request to orderer") return responses, errs } - broadcastStream.CloseSend() + + if err = broadcastClient.CloseSend(); err != nil { + logger.Debugf("unable to close deliver client [%s]", err) + } return responses, errs } -func blockStream(broadcastStream ab.AtomicBroadcast_DeliverClient, responses chan *common.Block, errs chan error) { +func blockStream(deliverClient ab.AtomicBroadcast_DeliverClient, responses chan *common.Block, errs chan error) { for { - response, err := broadcastStream.Recv() + response, err := deliverClient.Recv() if err != nil { errs <- errors.Wrap(err, "recv from ordering service failed") return @@ -357,11 +366,11 @@ func blockStream(broadcastStream ab.AtomicBroadcast_DeliverClient, responses cha // Seek operation success, no more resposes case *ab.DeliverResponse_Status: logger.Debugf("Received deliver response status from ordering service: %s", t.Status) - if t.Status == common.Status_SUCCESS { - close(responses) + if t.Status != common.Status_SUCCESS { + errs <- errors.Errorf("error status from ordering service %s", t.Status) return } - errs <- errors.Errorf("error status from ordering service %s", t.Status) + close(responses) return // Response is a requested block @@ -386,5 +395,7 @@ func (*defCommManager) DialContext(ctx reqContext.Context, target string, opts . func (*defCommManager) ReleaseConn(conn *grpc.ClientConn) { logger.Debugf("ReleaseConn [%p]", conn) - conn.Close() + if err := conn.Close(); err != nil { + logger.Debugf("unable to close connection [%s]", err) + } } diff --git a/pkg/fab/orderer/orderer_test.go b/pkg/fab/orderer/orderer_test.go index 2dad61a838..8e6d5aac66 100644 --- a/pkg/fab/orderer/orderer_test.go +++ b/pkg/fab/orderer/orderer_test.go @@ -34,22 +34,6 @@ import ( var testOrdererURL = "127.0.0.1:0" -var validRootCA = `-----BEGIN CERTIFICATE----- -MIICYjCCAgmgAwIBAgIUB3CTDOU47sUC5K4kn/Caqnh114YwCgYIKoZIzj0EAwIw -fzELMAkGA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWExFjAUBgNVBAcTDVNh -biBGcmFuY2lzY28xHzAdBgNVBAoTFkludGVybmV0IFdpZGdldHMsIEluYy4xDDAK -BgNVBAsTA1dXVzEUMBIGA1UEAxMLZXhhbXBsZS5jb20wHhcNMTYxMDEyMTkzMTAw -WhcNMjExMDExMTkzMTAwWjB/MQswCQYDVQQGEwJVUzETMBEGA1UECBMKQ2FsaWZv -cm5pYTEWMBQGA1UEBxMNU2FuIEZyYW5jaXNjbzEfMB0GA1UEChMWSW50ZXJuZXQg -V2lkZ2V0cywgSW5jLjEMMAoGA1UECxMDV1dXMRQwEgYDVQQDEwtleGFtcGxlLmNv -bTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABKIH5b2JaSmqiQXHyqC+cmknICcF -i5AddVjsQizDV6uZ4v6s+PWiJyzfA/rTtMvYAPq/yeEHpBUB1j053mxnpMujYzBh -MA4GA1UdDwEB/wQEAwIBBjAPBgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBQXZ0I9 -qp6CP8TFHZ9bw5nRtZxIEDAfBgNVHSMEGDAWgBQXZ0I9qp6CP8TFHZ9bw5nRtZxI -EDAKBggqhkjOPQQDAgNHADBEAiAHp5Rbp9Em1G/UmKn8WsCbqDfWecVbZPQj3RK4 -oG5kQQIgQAe4OOKYhJdh3f7URaKfGTf492/nmRmtK+ySKjpHSrU= ------END CERTIFICATE-----` - var ordererAddr string var ordererMockSrv *mocks.MockBroadcastServer diff --git a/pkg/fab/peer/peer.go b/pkg/fab/peer/peer.go index e13b66bb22..c2cca0a519 100644 --- a/pkg/fab/peer/peer.go +++ b/pkg/fab/peer/peer.go @@ -8,7 +8,6 @@ package peer import ( reqContext "context" - "fmt" "crypto/x509" @@ -48,7 +47,6 @@ func New(config core.Config, opts ...Option) (*Peer, error) { config: config, commManager: &defCommManager{}, } - var err error for _, opt := range opts { err := opt(peer) @@ -70,11 +68,12 @@ func New(config core.Config, opts ...Option) (*Peer, error) { allowInsecure: peer.inSecure, commManager: peer.commManager, } - peer.processor, err = newPeerEndorser(&endorseRequest) + processor, err := newPeerEndorser(&endorseRequest) if err != nil { return nil, err } + peer.processor = processor } return peer, nil @@ -220,7 +219,7 @@ func (p *Peer) ProcessTransactionProposal(ctx reqContext.Context, proposal fab.P } func (p *Peer) String() string { - return fmt.Sprintf("%s", p.url) + return p.url } // PeersToTxnProcessors converts a slice of Peers to a slice of TxnProposalProcessors @@ -243,5 +242,7 @@ func (*defCommManager) DialContext(ctx reqContext.Context, target string, opts . func (*defCommManager) ReleaseConn(conn *grpc.ClientConn) { logger.Debugf("ReleaseConn [%p]", conn) - conn.Close() + if err := conn.Close(); err != nil { + logger.Debugf("unable to close connection [%s]", err) + } } diff --git a/pkg/fab/peer/peerendorser.go b/pkg/fab/peer/peerendorser.go index 7487d78682..a46b1412ef 100644 --- a/pkg/fab/peer/peerendorser.go +++ b/pkg/fab/peer/peerendorser.go @@ -31,7 +31,6 @@ type peerEndorser struct { grpcDialOption []grpc.DialOption target string dialTimeout time.Duration - failFast bool commManager fab.CommManager } diff --git a/pkg/fab/peer/peerendorser_test.go b/pkg/fab/peer/peerendorser_test.go index d94b8774f8..8f2943c20c 100644 --- a/pkg/fab/peer/peerendorser_test.go +++ b/pkg/fab/peer/peerendorser_test.go @@ -31,10 +31,8 @@ import ( ) const ( - ecertPath = "../../../test/fixtures/fabricca/tls/certs/client/client_client1.pem" peer1URL = "localhost:7050" peer2URL = "localhost:7054" - peerURLBad = "localhost:9999" testAddress = "127.0.0.1:0" )