Skip to content

Commit

Permalink
[FAB-8782] Resolve linter in peer/ord/comm
Browse files Browse the repository at this point in the history
This change resolve gometalinter warnings in:
pkg/fab/peer
pkg/fab/orderer
pkg/fab/comm

Change-Id: I390f5d22f0f78e2e0a9109e8796cef50ea36bdc0
Signed-off-by: Troy Ronda <troy@troyronda.com>
  • Loading branch information
troyronda committed Mar 11, 2018
1 parent 81a872e commit 96a926d
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 101 deletions.
4 changes: 2 additions & 2 deletions pkg/fab/comm/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func NewConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvide

stream, err := streamProvider(grpcconn)
if err != nil {
if err := grpcconn.Close(); err != nil {
logger.Warnf("error closing GRPC connection: %s", err)
if closeErr := grpcconn.Close(); err != nil {
logger.Warnf("error closing GRPC connection: %s", closeErr)
}
return nil, errors.Wrapf(err, "could not create stream to %s", url)
}
Expand Down
16 changes: 4 additions & 12 deletions pkg/fab/comm/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"google.golang.org/grpc/keepalive"

fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/mocks"
fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"

Expand All @@ -37,11 +36,11 @@ func TestConnection(t *testing.T) {
context := newMockContext()
chConfig := fabmocks.NewMockChannelCfg(channelID)

conn, err := NewConnection(context, chConfig, testStream, "")
_, err := NewConnection(context, chConfig, testStream, "")
if err == nil {
t.Fatalf("expected error creating new connection with empty URL")
}
conn, err = NewConnection(context, chConfig, testStream, "invalidhost:0000",
_, err = NewConnection(context, chConfig, testStream, "invalidhost:0000",
WithFailFast(true),
WithCertificate(nil),
WithHostOverride(""),
Expand All @@ -51,12 +50,12 @@ func TestConnection(t *testing.T) {
if err == nil {
t.Fatalf("expected error creating new connection with invalid URL")
}
conn, err = NewConnection(context, chConfig, invalidStream, peerURL)
_, err = NewConnection(context, chConfig, invalidStream, peerURL)
if err == nil {
t.Fatalf("expected error creating new connection with invalid stream but got none")
}

conn, err = NewConnection(context, chConfig, testStream, peerURL)
conn, err := NewConnection(context, chConfig, testStream, peerURL)
if err != nil {
t.Fatalf("error creating new connection: %s", err)
}
Expand Down Expand Up @@ -85,13 +84,6 @@ func TestConnection(t *testing.T) {
var testServer *eventmocks.MockEventhubServer
var endorserAddr []string

func newPeerConfig(peerURL string) *core.PeerConfig {
return &core.PeerConfig{
URL: peerURL,
GRPCOptions: make(map[string]interface{}),
}
}

func newMockContext() fabcontext.Client {
return fabmocks.NewMockContext(fabmocks.NewMockUser("test"))
}
18 changes: 14 additions & 4 deletions pkg/fab/comm/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ func (cc *CachingConnector) removeConn(target string) {
if ok {
delete(cc.index, c.conn)
cc.conns.Delete(target)
c.conn.Close()
if err := c.conn.Close(); err != nil {
logger.Debugf("unable to close connection [%s]", err)
}
}
}
}
Expand Down Expand Up @@ -316,7 +318,11 @@ func cache(conns map[string]*cachedConn, updateConn *cachedConn) {
logger.Debugf("new connection in connection janitor")
} else if c.conn != updateConn.conn {
logger.Debugf("connection change in connection janitor")
c.conn.Close() // Not blocking

if err := c.conn.Close(); err != nil {
logger.Debugf("unable to close connection [%s]", err)
}

} else {
logger.Debugf("updating existing connection in connection janitor")
}
Expand Down Expand Up @@ -347,9 +353,13 @@ func sweep(conns map[string]*cachedConn, idleTime time.Duration) []string {
}

func closeConn(conn *grpc.ClientConn) {
conn.Close()
if err := conn.Close(); err != nil {
logger.Debugf("unable to close connection [%s]", err)
}

ctx, cancel := context.WithTimeout(context.Background(), connShutdownTimeout)
waitConn(ctx, conn, connectivity.Shutdown)
if err := waitConn(ctx, conn, connectivity.Shutdown); err != nil {
logger.Debugf("unable to wait for connection close [%s]", err)
}
cancel()
}
4 changes: 1 addition & 3 deletions pkg/fab/comm/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,17 +195,15 @@ func testDial(t *testing.T, wg *sync.WaitGroup, connector *CachingConnector, add
conn, err := connector.DialContext(ctx, addr, grpc.WithInsecure())
cancel()
assert.Nil(t, err, "DialContext should have succeeded")
defer connector.ReleaseConn(conn)

endorserClient := pb.NewEndorserClient(conn)
ctx, cancel = context.WithTimeout(context.Background(), normalTimeout)
proposal := pb.SignedProposal{}
resp, err := endorserClient.ProcessProposal(context.Background(), &proposal)
cancel()

assert.Nil(t, err, "peer process proposal should not have error")
assert.Equal(t, int32(200), resp.GetResponse().Status)

randomSleep := rand.Intn(maxSleepBeforeRelease)
time.Sleep(time.Duration(minSleepBeforeRelease)*time.Millisecond + time.Duration(randomSleep)*time.Millisecond)
connector.ReleaseConn(conn)
}
123 changes: 67 additions & 56 deletions pkg/fab/orderer/orderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,16 @@ var logger = logging.NewLogger("fabsdk/fab")

// Orderer allows a client to broadcast a transaction.
type Orderer struct {
config core.Config
url string
tlsCACert *x509.Certificate
serverName string
grpcDialOption []grpc.DialOption
kap keepalive.ClientParameters
dialTimeout time.Duration
failFast bool
transportCredentials credentials.TransportCredentials
allowInsecure bool
commManager fab.CommManager
config core.Config
url string
serverName string
tlsCACert *x509.Certificate
grpcDialOption []grpc.DialOption
kap keepalive.ClientParameters
dialTimeout time.Duration
failFast bool
allowInsecure bool
commManager fab.CommManager
}

// Option describes a functional parameter for the New constructor
Expand Down Expand Up @@ -245,52 +244,58 @@ func (o *Orderer) SendBroadcast(ctx reqContext.Context, envelope *fab.SignedEnve
}
defer o.releaseConn(ctx, conn)

broadcastStream, err := ab.NewAtomicBroadcastClient(conn).Broadcast(ctx)
broadcastClient, err := ab.NewAtomicBroadcastClient(conn).Broadcast(ctx)
if err != nil {
rpcStatus, ok := grpcstatus.FromError(err)
if ok {
err = status.NewFromGRPCStatus(rpcStatus)
}
return nil, errors.Wrap(err, "NewAtomicBroadcastClient failed")
}
done := make(chan bool)
var broadcastErr error
var broadcastStatus *common.Status

go func() {
for {
broadcastResponse, err := broadcastStream.Recv()
logger.Debugf("Orderer.broadcastStream - response:%v, error:%v\n", broadcastResponse, err)
if err != nil {
rpcStatus, ok := grpcstatus.FromError(err)
if ok {
err = status.NewFromGRPCStatus(rpcStatus)
}
broadcastErr = errors.Wrap(err, "broadcast recv failed")
done <- true
return
}
broadcastStatus = &broadcastResponse.Status
if broadcastResponse.Status == common.Status_SUCCESS {
done <- true
return
}
if broadcastResponse.Status != common.Status_SUCCESS {
broadcastErr = status.New(status.OrdererServerStatus, int32(broadcastResponse.Status), broadcastResponse.Info, nil)
done <- true
return
}
}
}()
if err := broadcastStream.Send(&common.Envelope{
responses := make(chan common.Status)
errs := make(chan error, 1)

go broadcastStream(broadcastClient, responses, errs)

err = broadcastClient.Send(&common.Envelope{
Payload: envelope.Payload,
Signature: envelope.Signature,
}); err != nil {
})
if err != nil {
return nil, errors.Wrap(err, "failed to send envelope to orderer")
}
broadcastStream.CloseSend()
<-done
return broadcastStatus, broadcastErr
if err = broadcastClient.CloseSend(); err != nil {
logger.Debugf("unable to close broadcast client [%s]", err)
}

select {
case broadcastStatus := <-responses:
return &broadcastStatus, nil
case broadcastErr := <-errs:
return nil, broadcastErr
}
}

func broadcastStream(broadcastClient ab.AtomicBroadcast_BroadcastClient, responses chan common.Status, errs chan error) {

broadcastResponse, err := broadcastClient.Recv()
logger.Debugf("Orderer.broadcastStream - response:%v, error:%v", broadcastResponse, err)
if err != nil {
rpcStatus, ok := grpcstatus.FromError(err)
if ok {
err = status.NewFromGRPCStatus(rpcStatus)
}
errs <- errors.Wrap(err, "broadcast recv failed")
return
}

if broadcastResponse.Status != common.Status_SUCCESS {
errs <- status.New(status.OrdererServerStatus, int32(broadcastResponse.Status), broadcastResponse.Info, nil)
return
}

responses <- broadcastResponse.Status
}

// SendDeliver sends a deliver request to the ordering service and returns the
Expand All @@ -314,7 +319,7 @@ func (o *Orderer) SendDeliver(ctx reqContext.Context, envelope *fab.SignedEnvelo
}

// Create atomic broadcast client
broadcastStream, err := ab.NewAtomicBroadcastClient(conn).Deliver(ctx)
broadcastClient, err := ab.NewAtomicBroadcastClient(conn).Deliver(ctx)
if err != nil {
logger.Errorf("deliver failed [%s]", err)
o.releaseConn(ctx, conn)
Expand All @@ -325,29 +330,33 @@ func (o *Orderer) SendDeliver(ctx reqContext.Context, envelope *fab.SignedEnvelo

// Receive blocks from the GRPC stream and put them on the channel
go func() {
blockStream(broadcastStream, responses, errs)
blockStream(broadcastClient, responses, errs)
o.releaseConn(ctx, conn)
}()

// Send block request envelope
logger.Debugf("Requesting blocks from ordering service")
if err := broadcastStream.Send(&common.Envelope{
err = broadcastClient.Send(&common.Envelope{
Payload: envelope.Payload,
Signature: envelope.Signature,
}); err != nil {
})
if err != nil {
o.releaseConn(ctx, conn)

errs <- errors.Wrap(err, "failed to send block request to orderer")
return responses, errs
}
broadcastStream.CloseSend()

if err = broadcastClient.CloseSend(); err != nil {
logger.Debugf("unable to close deliver client [%s]", err)
}

return responses, errs
}

func blockStream(broadcastStream ab.AtomicBroadcast_DeliverClient, responses chan *common.Block, errs chan error) {
func blockStream(deliverClient ab.AtomicBroadcast_DeliverClient, responses chan *common.Block, errs chan error) {
for {
response, err := broadcastStream.Recv()
response, err := deliverClient.Recv()
if err != nil {
errs <- errors.Wrap(err, "recv from ordering service failed")
return
Expand All @@ -357,11 +366,11 @@ func blockStream(broadcastStream ab.AtomicBroadcast_DeliverClient, responses cha
// Seek operation success, no more resposes
case *ab.DeliverResponse_Status:
logger.Debugf("Received deliver response status from ordering service: %s", t.Status)
if t.Status == common.Status_SUCCESS {
close(responses)
if t.Status != common.Status_SUCCESS {
errs <- errors.Errorf("error status from ordering service %s", t.Status)
return
}
errs <- errors.Errorf("error status from ordering service %s", t.Status)
close(responses)
return

// Response is a requested block
Expand All @@ -386,5 +395,7 @@ func (*defCommManager) DialContext(ctx reqContext.Context, target string, opts .

func (*defCommManager) ReleaseConn(conn *grpc.ClientConn) {
logger.Debugf("ReleaseConn [%p]", conn)
conn.Close()
if err := conn.Close(); err != nil {
logger.Debugf("unable to close connection [%s]", err)
}
}
16 changes: 0 additions & 16 deletions pkg/fab/orderer/orderer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,6 @@ import (

var testOrdererURL = "127.0.0.1:0"

var validRootCA = `-----BEGIN CERTIFICATE-----
MIICYjCCAgmgAwIBAgIUB3CTDOU47sUC5K4kn/Caqnh114YwCgYIKoZIzj0EAwIw
fzELMAkGA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWExFjAUBgNVBAcTDVNh
biBGcmFuY2lzY28xHzAdBgNVBAoTFkludGVybmV0IFdpZGdldHMsIEluYy4xDDAK
BgNVBAsTA1dXVzEUMBIGA1UEAxMLZXhhbXBsZS5jb20wHhcNMTYxMDEyMTkzMTAw
WhcNMjExMDExMTkzMTAwWjB/MQswCQYDVQQGEwJVUzETMBEGA1UECBMKQ2FsaWZv
cm5pYTEWMBQGA1UEBxMNU2FuIEZyYW5jaXNjbzEfMB0GA1UEChMWSW50ZXJuZXQg
V2lkZ2V0cywgSW5jLjEMMAoGA1UECxMDV1dXMRQwEgYDVQQDEwtleGFtcGxlLmNv
bTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABKIH5b2JaSmqiQXHyqC+cmknICcF
i5AddVjsQizDV6uZ4v6s+PWiJyzfA/rTtMvYAPq/yeEHpBUB1j053mxnpMujYzBh
MA4GA1UdDwEB/wQEAwIBBjAPBgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBQXZ0I9
qp6CP8TFHZ9bw5nRtZxIEDAfBgNVHSMEGDAWgBQXZ0I9qp6CP8TFHZ9bw5nRtZxI
EDAKBggqhkjOPQQDAgNHADBEAiAHp5Rbp9Em1G/UmKn8WsCbqDfWecVbZPQj3RK4
oG5kQQIgQAe4OOKYhJdh3f7URaKfGTf492/nmRmtK+ySKjpHSrU=
-----END CERTIFICATE-----`

var ordererAddr string
var ordererMockSrv *mocks.MockBroadcastServer

Expand Down
11 changes: 6 additions & 5 deletions pkg/fab/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package peer

import (
reqContext "context"
"fmt"

"crypto/x509"

Expand Down Expand Up @@ -48,7 +47,6 @@ func New(config core.Config, opts ...Option) (*Peer, error) {
config: config,
commManager: &defCommManager{},
}
var err error

for _, opt := range opts {
err := opt(peer)
Expand All @@ -70,11 +68,12 @@ func New(config core.Config, opts ...Option) (*Peer, error) {
allowInsecure: peer.inSecure,
commManager: peer.commManager,
}
peer.processor, err = newPeerEndorser(&endorseRequest)
processor, err := newPeerEndorser(&endorseRequest)

if err != nil {
return nil, err
}
peer.processor = processor
}

return peer, nil
Expand Down Expand Up @@ -220,7 +219,7 @@ func (p *Peer) ProcessTransactionProposal(ctx reqContext.Context, proposal fab.P
}

func (p *Peer) String() string {
return fmt.Sprintf("%s", p.url)
return p.url
}

// PeersToTxnProcessors converts a slice of Peers to a slice of TxnProposalProcessors
Expand All @@ -243,5 +242,7 @@ func (*defCommManager) DialContext(ctx reqContext.Context, target string, opts .

func (*defCommManager) ReleaseConn(conn *grpc.ClientConn) {
logger.Debugf("ReleaseConn [%p]", conn)
conn.Close()
if err := conn.Close(); err != nil {
logger.Debugf("unable to close connection [%s]", err)
}
}
1 change: 0 additions & 1 deletion pkg/fab/peer/peerendorser.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type peerEndorser struct {
grpcDialOption []grpc.DialOption
target string
dialTimeout time.Duration
failFast bool
commManager fab.CommManager
}

Expand Down
Loading

0 comments on commit 96a926d

Please sign in to comment.