Skip to content

Commit

Permalink
[FAB-8755] Event Client Cleanup
Browse files Browse the repository at this point in the history
- Enhancments to unit tests
- Ensure debug level is used for logging debug
- Fix bug where incorrect payload was being
  signed in deliver connection
- Add insecure option to connection

Change-Id: Ib5e6030e739add5f27795f11cadb57b0b0b9fcde
Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Mar 9, 2018
1 parent 352430f commit 8e5aa5d
Show file tree
Hide file tree
Showing 17 changed files with 272 additions and 231 deletions.
2 changes: 1 addition & 1 deletion pkg/fab/comm/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func newDialOpts(config core.Config, url string, params *params) ([]grpc.DialOpt

dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.FailFast(params.failFast)))

if urlutil.IsTLSEnabled(url) {
if urlutil.AttemptSecured(url, params.insecure) {
tlsConfig, err := comm.TLSConfig(params.certificate, params.hostOverride, config)
if err != nil {
return nil, err
Expand Down
20 changes: 20 additions & 0 deletions pkg/fab/comm/connectionopts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type params struct {
certificate *x509.Certificate
keepAliveParams keepalive.ClientParameters
failFast bool
insecure bool
connectTimeout time.Duration
}

Expand Down Expand Up @@ -74,6 +75,16 @@ func WithConnectTimeout(value time.Duration) options.Opt {
}
}

// WithInsecure indicates to fall back to an insecure connection if the
// connection URL does not specify a protocol
func WithInsecure() options.Opt {
return func(p options.Params) {
if setter, ok := p.(insecureSetter); ok {
setter.SetInsecure(true)
}
}
}

func (p *params) SetHostOverride(value string) {
logger.Debugf("HostOverride: %s", value)
p.hostOverride = value
Expand All @@ -99,6 +110,11 @@ func (p *params) SetConnectTimeout(value time.Duration) {
p.connectTimeout = value
}

func (p *params) SetInsecure(value bool) {
logger.Debugf("Insecure: %t", value)
p.insecure = value
}

type hostOverrideSetter interface {
SetHostOverride(value string)
}
Expand All @@ -115,6 +131,10 @@ type failFastSetter interface {
SetFailFast(value bool)
}

type insecureSetter interface {
SetInsecure(value bool)
}

type connectTimeoutSetter interface {
SetConnectTimeout(value time.Duration)
}
33 changes: 25 additions & 8 deletions pkg/fab/events/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func New(permitBlockEvents bool, dispatcher eventservice.Dispatcher, opts ...opt
return &Client{
Service: *eventservice.New(dispatcher, opts...),
params: *params,
connEvent: make(chan *fab.ConnectionEvent),
connectionState: int32(Disconnected),
permitBlockEvents: permitBlockEvents,
}
Expand Down Expand Up @@ -117,9 +116,7 @@ func (c *Client) Close() {

logger.Debugf("Stopping client...")

if c.connEventCh != nil {
close(c.connEventCh)
}
c.closeConnectEventChan()

logger.Debugf("Sending disconnect request...")

Expand Down Expand Up @@ -302,10 +299,7 @@ func (c *Client) monitorConnection() {
break
}

if c.connEventCh != nil {
logger.Debugln("Sending connection event to subscriber.")
c.connEventCh <- event
}
c.notifyConnectEventChan(event)

if event.Connected {
logger.Debugf("Event client has connected")
Expand Down Expand Up @@ -346,6 +340,29 @@ func (c *Client) reconnect() {
}
}

func (c *Client) closeConnectEventChan() {
c.Lock()
defer c.Unlock()
if c.connEventCh != nil {
close(c.connEventCh)
}
}

func (c *Client) connectEventChan() chan *fab.ConnectionEvent {
c.RLock()
defer c.RUnlock()
return c.connEventCh
}

func (c *Client) notifyConnectEventChan(event *fab.ConnectionEvent) {
c.RLock()
defer c.RUnlock()
if c.connEventCh != nil {
logger.Debugln("Sending connection event to subscriber.")
c.connEventCh <- event
}
}

func (s ConnectionState) String() string {
switch s {
case Disconnected:
Expand Down
17 changes: 9 additions & 8 deletions pkg/fab/events/deliverclient/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (c *DeliverConnection) Send(seekInfo *ab.SeekInfo) error {
return errors.New("connection is closed")
}

logger.Debugf("Sending %v\n", seekInfo)
logger.Debugf("Sending %#v", seekInfo)

env, err := c.createSignedEnvelope(seekInfo)
if err != nil {
Expand All @@ -110,32 +110,34 @@ func (c *DeliverConnection) Receive(eventch chan<- interface{}) {
for {
stream := c.deliverStream()
if stream == nil {
logger.Warnf("The stream has closed. Terminating loop.\n")
logger.Warnf("The stream has closed. Terminating loop.")
break
}

in, err := stream.Recv()

logger.Debugf("Got deliver response: %#v", in)

if c.Closed() {
logger.Debugf("The connection has closed. Terminating loop.\n")
logger.Debugf("The connection has closed with error [%s]. Terminating loop.", err)
break
}

if err == io.EOF {
// This signifies that the stream has been terminated at the client-side. No need to send an event.
logger.Debugf("Received EOF from stream.\n")
logger.Debugf("Received EOF from stream.")
break
}

if err != nil {
logger.Errorf("Received error from stream: [%s]. Sending disconnected event.\n", err)
logger.Errorf("Received error from stream: [%s]. Sending disconnected event.", err)
eventch <- clientdisp.NewDisconnectedEvent(err)
break
}

eventch <- in
}
logger.Debugf("Exiting stream listener\n")
logger.Debugf("Exiting stream listener")
}

func (c *DeliverConnection) createSignedEnvelope(msg proto.Message) (*cb.Envelope, error) {
Expand All @@ -145,7 +147,6 @@ func (c *DeliverConnection) createSignedEnvelope(msg proto.Message) (*cb.Envelop

payloadChannelHeader := utils.MakeChannelHeader(cb.HeaderType_DELIVER_SEEK_INFO, msgVersion, c.ChannelID(), epoch)
payloadChannelHeader.TlsCertHash = c.TLSCertHash()
var err error

data, err := proto.Marshal(msg)
if err != nil {
Expand All @@ -172,7 +173,7 @@ func (c *DeliverConnection) createSignedEnvelope(msg proto.Message) (*cb.Envelop
Data: data,
})

signature, err := c.Context().SigningManager().Sign(data, c.Context().PrivateKey())
signature, err := c.Context().SigningManager().Sign(paylBytes, c.Context().PrivateKey())
if err != nil {
return nil, err
}
Expand Down
71 changes: 31 additions & 40 deletions pkg/fab/events/deliverclient/connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"google.golang.org/grpc/keepalive"

fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/comm"
clientdisp "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek"
Expand Down Expand Up @@ -68,6 +67,33 @@ func TestConnection(t *testing.T) {
t.Fatalf("error creating new connection: %s", err)
}

conn.Close()

// Calling close again should be ignored
conn.Close()
}

func TestForbiddenConnection(t *testing.T) {
expectedStatus := cb.Status_FORBIDDEN
deliverServer.SetStatus(expectedStatus)
defer deliverServer.SetStatus(cb.Status_UNKNOWN)

channelID := "mychannel"
conn, err := New(newMockContext(), channelID, Deliver, peerURL,
comm.WithConnectTimeout(3*time.Second),
comm.WithFailFast(true),
comm.WithKeepAliveParams(
keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: true,
},
),
)
if err != nil {
t.Fatalf("error creating new connection: %s", err)
}

eventch := make(chan interface{})

go conn.Receive(eventch)
Expand All @@ -77,21 +103,15 @@ func TestConnection(t *testing.T) {
if !ok {
t.Fatalf("unexpected closed connection")
}
deliverResponse, ok := e.(*pb.DeliverResponse)
if !ok {
t.Fatalf("expected deliver response but got %T", e)
}
if deliverResponse.GetStatus() != cb.Status_SUCCESS {
t.Fatalf("expected deliver response status [%s] but got [%s]", cb.Status_SUCCESS, deliverResponse.GetStatus())
statusResponse := e.(*pb.DeliverResponse).Type.(*pb.DeliverResponse_Status)
if statusResponse.Status != expectedStatus {
t.Fatalf("expecting status %s but got %s", expectedStatus, statusResponse.Status)
}
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for event")
}

conn.Close()

// Calling close again should be ignored
conn.Close()
}

func TestSend(t *testing.T) {
Expand All @@ -116,19 +136,6 @@ func TestDisconnected(t *testing.T) {

deliverServer.Disconnect(errors.New("simulating disconnect"))

select {
case e, ok := <-eventch:
if !ok {
t.Fatalf("unexpected closed connection")
}
statusResponse := e.(*pb.DeliverResponse).Type.(*pb.DeliverResponse_Status)
if statusResponse.Status != cb.Status_SUCCESS {
t.Fatalf("expecting status %s but got %s", cb.Status_SUCCESS, statusResponse.Status)
}
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for event")
}

if err := conn.Send(seek.InfoNewest()); err != nil {
t.Fatalf("error sending seek request for channel [%s]: err", err)
}
Expand Down Expand Up @@ -167,22 +174,6 @@ func testSend(t *testing.T, streamType streamType) {

go conn.Receive(eventch)

select {
case e, ok := <-eventch:
if !ok {
t.Fatalf("unexpected closed connection")
}
deliverResponse, ok := e.(*pb.DeliverResponse)
if !ok {
t.Fatalf("expected deliver response but got %T", e)
}
if deliverResponse.GetStatus() != cb.Status_SUCCESS {
t.Fatalf("expected deliver response status [%s] but got [%s]", cb.Status_SUCCESS, deliverResponse.GetStatus())
}
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for event")
}

if err := conn.Send(seek.InfoNewest()); err != nil {
t.Fatalf("error sending seek request for channel [%s]: err", err)
}
Expand Down Expand Up @@ -231,6 +222,6 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

func newMockContext() fabcontext.Client {
func newMockContext() *fabmocks.MockContext {
return fabmocks.NewMockContext(fabmocks.NewMockUser("user1"))
}
13 changes: 3 additions & 10 deletions pkg/fab/events/deliverclient/deliverclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package deliverclient

import (
"math"
"sync"
"time"

ab "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/protos/orderer"
Expand Down Expand Up @@ -38,14 +37,8 @@ var deliverFilteredProvider = func(channelID string, context fabcontext.Client,

// Client connects to a peer and receives channel events, such as bock, filtered block, chaincode, and transaction status events.
type Client struct {
sync.RWMutex
client.Client
params
connEvent chan *fab.ConnectionEvent
connectionState int32
stopped int32
registerOnce sync.Once
blockEventsPermitted bool
}

// New returns a new deliver event client
Expand Down Expand Up @@ -76,7 +69,7 @@ func New(context fabcontext.Client, channelID string, discoveryService fab.Disco
}

func (c *Client) seek() error {
logger.Debugf("sending seek request....\n")
logger.Debugf("Sending seek request....")

seekInfo, err := c.seekInfo()
if err != nil {
Expand All @@ -93,11 +86,11 @@ func (c *Client) seek() error {
}

if err != nil {
logger.Errorf("unable to send seek request: %s\n", err)
logger.Errorf("Unable to send seek request: %s", err)
return err
}

logger.Debugf("successfully sent seek\n")
logger.Debugf("Successfully sent seek")
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/fab/events/deliverclient/deliverclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestClientConnect(t *testing.T) {
withConnectionProvider(
clientmocks.NewProviderFactory().Provider(
delivermocks.NewConnection(
clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)),
clientmocks.WithLedger(servicemocks.NewMockLedger(delivermocks.BlockEventFactory)),
),
),
true,
Expand Down Expand Up @@ -185,7 +185,7 @@ func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome clientmo
withConnectionProvider(
cp.FlakeyProvider(
connAttemptResult,
clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)),
clientmocks.WithLedger(servicemocks.NewMockLedger(delivermocks.BlockEventFactory)),
clientmocks.WithFactory(func(opts ...clientmocks.Opt) clientmocks.Connection {
return delivermocks.NewConnection(opts...)
}),
Expand Down Expand Up @@ -216,7 +216,7 @@ func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expe
cp := clientmocks.NewProviderFactory()

connectch := make(chan *fab.ConnectionEvent)
ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory)
ledger := servicemocks.NewMockLedger(delivermocks.BlockEventFactory)

eventClient, err := New(
newMockContext(), "mychannel",
Expand Down Expand Up @@ -276,7 +276,7 @@ func testReconnectRegistration(t *testing.T, connectResults clientmocks.ConnectA
channelID := "mychannel"
ccID := "mycc"

ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory)
ledger := servicemocks.NewMockLedger(delivermocks.BlockEventFactory)

// Add 2 blocks to the ledger befor the client has connected
ledger.NewBlock(channelID,
Expand Down
Loading

0 comments on commit 8e5aa5d

Please sign in to comment.