Skip to content

Commit

Permalink
[FAB-8717] Request Context
Browse files Browse the repository at this point in the history
This change adds a Go Context creation method
to the SDK's context package for request scopes. The comm
manager is also added to the request scope. Calls to
orderer and peer methods that Dial are modified to
first create a context using the new context
creation method.

Integration tests were also refactored to use the same
SDK instance for initialiation and usage.

Change-Id: I095016e6da5f8eade13c1f5dcf4d8f9fb287e6ed
Signed-off-by: Troy Ronda <troy@troyronda.com>
  • Loading branch information
troyronda committed Mar 9, 2018
1 parent 4664f80 commit d94bd30
Show file tree
Hide file tree
Showing 44 changed files with 351 additions and 278 deletions.
4 changes: 1 addition & 3 deletions pkg/client/channel/chclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,7 @@ func TestOrdererStatusError(t *testing.T) {
chClient := setupChannelClientWithNodes(peers, orderers, t)
chClient.eventHub = fcmocks.NewMockEventHub()

mockOrderer, ok := testOrderer1.(fcmocks.MockOrderer)
assert.True(t, ok, "Expected object to be mock orderer")
mockOrderer.EnqueueSendBroadcastError(status.New(status.OrdererClientStatus,
testOrderer1.EnqueueSendBroadcastError(status.New(status.OrdererClientStatus,
status.ConnectionFailed.ToInt32(), testErrorMessage, nil))

_, err := chClient.Execute(Request{ChaincodeID: "test", Fcn: "invoke",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"fmt"
"sync"

"github.com/hyperledger/fabric-sdk-go/pkg/context"
contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/context"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
"github.com/pkg/errors"
Expand Down Expand Up @@ -53,7 +53,7 @@ type selectionService struct {
}

// Initialize allow for initializing providers
func (p *SelectionProvider) Initialize(providers *context.Provider) error {
func (p *SelectionProvider) Initialize(providers contextAPI.Providers) error {
p.providers = providers
return nil
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/client/resmgmt/resmgmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestJoinChannel(t *testing.T) {

// Create mock orderer with simple mock block
orderer := fcmocks.NewMockOrderer("", nil)
orderer.(fcmocks.MockOrderer).EnqueueForSendDeliver(fcmocks.NewSimpleMockBlock())
orderer.EnqueueForSendDeliver(fcmocks.NewSimpleMockBlock())
rc := setupResMgmtClient(ctx, nil, t)

// Setup target peers
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestJoinChannelWithFilter(t *testing.T) {

// Create mock orderer with simple mock block
orderer := fcmocks.NewMockOrderer("", nil)
orderer.(fcmocks.MockOrderer).EnqueueForSendDeliver(fcmocks.NewSimpleMockBlock())
orderer.EnqueueForSendDeliver(fcmocks.NewSimpleMockBlock())
//the target filter ( client option) will be set
rc := setupResMgmtClient(ctx, nil, t)

Expand Down Expand Up @@ -263,7 +263,8 @@ func TestJoinChannelNoOrdererConfig(t *testing.T) {
t.Fatal(err)
}
ctx.SetConfig(invalidOrdererConfig)
customFabProvider := fabpvdr.New(ctx)
customFabProvider := fabpvdr.New(ctx.Config())
customFabProvider.Initialize(ctx)
ctx.SetCustomInfraProvider(customFabProvider)

rc = setupResMgmtClient(ctx, nil, t)
Expand Down
12 changes: 11 additions & 1 deletion pkg/context/api/fab/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ SPDX-License-Identifier: Apache-2.0
package fab

import (
reqContext "context"

"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
"google.golang.org/grpc"
)

// InfraProvider enables access to fabric objects such as peer and user based on config or
// InfraProvider enables access to fabric objects such as peer based on config
type InfraProvider interface {
CreateChannelLedger(ic IdentityContext, name string) (ChannelLedger, error)
CreateChannelConfig(user IdentityContext, name string) (ChannelConfig, error)
Expand All @@ -19,6 +22,7 @@ type InfraProvider interface {
CreateEventHub(ic IdentityContext, name string) (EventHub, error)
CreatePeerFromConfig(peerCfg *core.NetworkPeer) (Peer, error)
CreateOrdererFromConfig(cfg *core.OrdererConfig) (Orderer, error)
CommManager() CommManager
Close()
}

Expand Down Expand Up @@ -50,6 +54,12 @@ type TargetFilter interface {
Accept(peer Peer) bool
}

// CommManager enables network communication.
type CommManager interface {
DialContext(ctx reqContext.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error)
ReleaseConn(conn *grpc.ClientConn)
}

// Providers represents the SDK configured service providers context.
type Providers interface {
DiscoveryProvider() DiscoveryProvider
Expand Down
20 changes: 19 additions & 1 deletion pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ SPDX-License-Identifier: Apache-2.0
package context

import (
reqContext "context"
"strings"

"github.com/pkg/errors"

"github.com/hyperledger/fabric-sdk-go/pkg/common/context"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
"github.com/pkg/errors"
)

// Client supplies the configuration and signing identity to client objects.
Expand Down Expand Up @@ -215,3 +217,19 @@ func NewChannel(clientProvider context.ClientProvider, channelID string) (*Chann
channelService: channelService,
}, nil
}

type reqContextKey string

var reqContextCommManager = reqContextKey("commManager")

// NewRequest creates a request-scoped context.
func NewRequest(client context.Client) reqContext.Context {
ctx := reqContext.WithValue(reqContext.Background(), reqContextCommManager, client.InfraProvider().CommManager())
return ctx
}

// RequestCommManager extracts the CommManager from the request-scoped context.
func RequestCommManager(ctx reqContext.Context) (fab.CommManager, bool) {
commManager, ok := ctx.Value(reqContextCommManager).(fab.CommManager)
return commManager, ok
}
8 changes: 6 additions & 2 deletions pkg/fab/mocks/mockfabricprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,14 @@ func (f *MockInfraProvider) CreatePeerFromConfig(peerCfg *core.NetworkPeer) (fab

// CreateOrdererFromConfig creates a default implementation of Orderer based on configuration.
func (f *MockInfraProvider) CreateOrdererFromConfig(cfg *core.OrdererConfig) (fab.Orderer, error) {
return &mockOrderer{}, nil
return &MockOrderer{}, nil
}

//CommManager returns comm provider
func (f *MockInfraProvider) CommManager() fab.CommManager {
return nil
}

//Close mock close function
func (f *MockInfraProvider) Close() {
return
}
59 changes: 37 additions & 22 deletions pkg/fab/mocks/mockorderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,7 @@ import (
// Nothe that calling broadcast doesn't deliver anythng. This implies
// that the broadcast side and the deliver side are totally
// independent from the mocking point of view.
type MockOrderer interface {
fab.Orderer
// Enqueues a mock error to be returned to the client calling SendBroadcast
EnqueueSendBroadcastError(err error)
// Enqueues a mock value (block or error) for delivery
EnqueueForSendDeliver(value interface{})
}
type mockOrderer struct {
type MockOrderer struct {
OrdererURL string
BroadcastListener chan *fab.SignedEnvelope
BroadcastErrors chan error
Expand All @@ -37,8 +30,8 @@ type mockOrderer struct {
}

// NewMockOrderer ...
func NewMockOrderer(url string, broadcastListener chan *fab.SignedEnvelope) fab.Orderer {
o := &mockOrderer{
func NewMockOrderer(url string, broadcastListener chan *fab.SignedEnvelope) *MockOrderer {
o := &MockOrderer{
OrdererURL: url,
BroadcastListener: broadcastListener,
BroadcastErrors: make(chan error, 100),
Expand All @@ -48,22 +41,35 @@ func NewMockOrderer(url string, broadcastListener chan *fab.SignedEnvelope) fab.
DeliveryQueue: make(chan interface{}, 100),
}

go broadcast(o)
if broadcastListener != nil {
go broadcast(o)
}
go delivery(o)
return o
}

func broadcast(o *mockOrderer) {
func broadcast(o *MockOrderer) {
for {
value := <-o.BroadcastQueue
value, ok := <-o.BroadcastQueue
if !ok {
close(o.BroadcastListener)
return
}
o.BroadcastListener <- value
}
}

func delivery(o *mockOrderer) {
func delivery(o *MockOrderer) {
for {
value := <-o.DeliveryQueue
value, ok := <-o.DeliveryQueue
if !ok {
close(o.Deliveries)
return
}
switch value.(type) {
case common.Status:
close(o.Deliveries)
return
case *common.Block:
o.Deliveries <- value.(*common.Block)
case error:
Expand All @@ -75,13 +81,13 @@ func delivery(o *mockOrderer) {
}

// URL returns the URL of the mock Orderer
func (o *mockOrderer) URL() string {
func (o *MockOrderer) URL() string {
return o.OrdererURL
}

// SendBroadcast accepts client broadcast calls and reports them to the listener channel
// Returns the first enqueued error, or nil if there are no enqueued errors
func (o *mockOrderer) SendBroadcast(ctx reqContext.Context, envelope *fab.SignedEnvelope) (*common.Status, error) {
func (o *MockOrderer) SendBroadcast(ctx reqContext.Context, envelope *fab.SignedEnvelope) (*common.Status, error) {
// Report this call to the listener
if o.BroadcastListener != nil {
o.BroadcastQueue <- envelope
Expand All @@ -95,21 +101,30 @@ func (o *mockOrderer) SendBroadcast(ctx reqContext.Context, envelope *fab.Signed
}

// SendDeliver returns the channels for delivery of prepared mock values and errors (if any)
func (o *mockOrderer) SendDeliver(ctx reqContext.Context, envelope *fab.SignedEnvelope) (chan *common.Block, chan error) {
func (o *MockOrderer) SendDeliver(ctx reqContext.Context, envelope *fab.SignedEnvelope) (chan *common.Block, chan error) {
return o.Deliveries, o.DeliveryErrors
}

func (o *mockOrderer) EnqueueSendBroadcastError(err error) {
// Close cleans up the instance and ends goroutines
func (o *MockOrderer) Close() {
close(o.BroadcastQueue)
close(o.DeliveryQueue)
}

// EnqueueSendBroadcastError enqueues error
func (o *MockOrderer) EnqueueSendBroadcastError(err error) {
o.BroadcastErrors <- err
}

// EnqueueForSendDeliver enqueues a mock value (block or error) for delivery
func (o *mockOrderer) EnqueueForSendDeliver(value interface{}) {
func (o *MockOrderer) EnqueueForSendDeliver(value interface{}) {
switch value.(type) {
case common.Status:
o.DeliveryQueue <- value
case *common.Block:
o.DeliveryQueue <- value.(*common.Block)
o.DeliveryQueue <- value
case error:
o.DeliveryQueue <- value.(error)
o.DeliveryQueue <- value
default:
panic(fmt.Sprintf("Value not *common.Block nor error: %v", value))
}
Expand Down
52 changes: 27 additions & 25 deletions pkg/fab/orderer/orderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
grpcstatus "google.golang.org/grpc/status"

ab "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric-sdk-go/pkg/context"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/core/config/comm"
Expand All @@ -30,11 +31,6 @@ import (

var logger = logging.NewLogger("fabsdk/fab")

type connProvider interface {
DialContext(ctx reqContext.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error)
ReleaseConn(conn *grpc.ClientConn)
}

// Orderer allows a client to broadcast a transaction.
type Orderer struct {
config core.Config
Expand All @@ -47,7 +43,7 @@ type Orderer struct {
failFast bool
transportCredentials credentials.TransportCredentials
allowInsecure bool
connector connProvider
commManager fab.CommManager
}

// Option describes a functional parameter for the New constructor
Expand All @@ -56,8 +52,8 @@ type Option func(*Orderer) error
// New Returns a Orderer instance
func New(config core.Config, opts ...Option) (*Orderer, error) {
orderer := &Orderer{
config: config,
connector: &defConnector{},
config: config,
commManager: &defCommManager{},
}

for _, opt := range opts {
Expand Down Expand Up @@ -127,15 +123,6 @@ func WithInsecure() Option {
}
}

// WithConnProvider allows a custom GRPC connection provider to be used.
func WithConnProvider(provider connProvider) Option {
return func(p *Orderer) error {
p.connector = provider

return nil
}
}

// FromOrdererConfig is a functional option for the orderer.New constructor that configures a new orderer
// from a apiconfig.OrdererConfig struct
func FromOrdererConfig(ordererCfg *core.OrdererConfig) Option {
Expand Down Expand Up @@ -222,7 +209,21 @@ func (o *Orderer) conn(ctx reqContext.Context) (*grpc.ClientConn, error) {
ctx, cancel := reqContext.WithTimeout(ctx, o.dialTimeout)
defer cancel()

return o.connector.DialContext(ctx, o.url, o.grpcDialOption...)
commManager, ok := context.RequestCommManager(ctx)
if !ok {
commManager = o.commManager
}

return commManager.DialContext(ctx, o.url, o.grpcDialOption...)
}

func (o *Orderer) releaseConn(ctx reqContext.Context, conn *grpc.ClientConn) {
commManager, ok := context.RequestCommManager(ctx)
if !ok {
commManager = o.commManager
}

commManager.ReleaseConn(conn)
}

// URL Get the Orderer url. Required property for the instance objects.
Expand All @@ -242,7 +243,7 @@ func (o *Orderer) SendBroadcast(ctx reqContext.Context, envelope *fab.SignedEnve

return nil, status.New(status.OrdererClientStatus, status.ConnectionFailed.ToInt32(), err.Error(), nil)
}
defer o.connector.ReleaseConn(conn)
defer o.releaseConn(ctx, conn)

broadcastStream, err := ab.NewAtomicBroadcastClient(conn).Broadcast(ctx)
if err != nil {
Expand Down Expand Up @@ -316,7 +317,7 @@ func (o *Orderer) SendDeliver(ctx reqContext.Context, envelope *fab.SignedEnvelo
broadcastStream, err := ab.NewAtomicBroadcastClient(conn).Deliver(ctx)
if err != nil {
logger.Errorf("deliver failed [%s]", err)
o.connector.ReleaseConn(conn)
o.commManager.ReleaseConn(conn)

errs <- errors.Wrap(err, "deliver failed")
return responses, errs
Expand All @@ -327,15 +328,16 @@ func (o *Orderer) SendDeliver(ctx reqContext.Context, envelope *fab.SignedEnvelo
Payload: envelope.Payload,
Signature: envelope.Signature,
}); err != nil {
o.connector.ReleaseConn(conn)
o.commManager.ReleaseConn(conn)

errs <- errors.Wrap(err, "failed to send block request to orderer")
return responses, errs
}
broadcastStream.CloseSend()

// Receive blocks from the GRPC stream and put them on the channel
go func() {
defer o.connector.ReleaseConn(conn)
defer o.commManager.ReleaseConn(conn)
blockStream(broadcastStream, responses, errs)

}()
Expand Down Expand Up @@ -374,13 +376,13 @@ func blockStream(broadcastStream ab.AtomicBroadcast_DeliverClient, responses cha
}
}

type defConnector struct{}
type defCommManager struct{}

func (*defConnector) DialContext(ctx reqContext.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
func (*defCommManager) DialContext(ctx reqContext.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
opts = append(opts, grpc.WithBlock())
return grpc.DialContext(ctx, target, opts...)
}

func (*defConnector) ReleaseConn(conn *grpc.ClientConn) {
func (*defCommManager) ReleaseConn(conn *grpc.ClientConn) {
conn.Close()
}
Loading

0 comments on commit d94bd30

Please sign in to comment.