Skip to content

Commit

Permalink
[FAB-8039] Split Endorsement Handler
Browse files Browse the repository at this point in the history
Split the endorsement handler into two
handlers:

- Proposal Processor Handler
- Endorsement Handler

This allows for more granular
reuse/replacement of handlers.

Change-Id: I122ef44fdfb5b7a1de3c62a646f06cd8244269f6
Signed-off-by: Bob Stasyszyn <bob.stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Feb 2, 2018
1 parent 913ef17 commit b8f2be7
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 58 deletions.
6 changes: 0 additions & 6 deletions api/apitxn/chclient/chclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ type Opts struct {
//Option func for each Opts argument
type Option func(opts *Opts) error

// TxProposalResponseFilter allows the user to inspect/modify response before commit
type TxProposalResponseFilter interface {
// process transaction proposal response (there will be no commit if an error is returned)
ProcessTxProposalResponse(txProposalResponse []*apifabclient.TransactionProposalResponse) ([]*apifabclient.TransactionProposalResponse, error)
}

// Registration is a handle that is returned from a successful Register Chaincode Event.
// This handle should be used in Unregister in order to unregister the event.
type Registration interface {
Expand Down
3 changes: 0 additions & 3 deletions pkg/fabric-client/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ type Channel struct {
// is enforced by the ordering service and must be unique within the blockchain network.
// client: Provides operational context such as submitting User etc.
func New(ctx fab.Context, cfg fab.ChannelCfg) (*Channel, error) {
if cfg.Name() == "" {
return nil, errors.Errorf("name is required")
}
if ctx == nil {
return nil, errors.Errorf("client is required")
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/fabric-client/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,8 @@ func TestChannelMethods(t *testing.T) {
}

_, err = New(ctx, mocks.NewMockChannelCfg(""))
if err == nil {
t.Fatalf("New didn't return error")
}
if err.Error() != "name is required" {
t.Fatalf("New didn't return right error")
if err != nil {
t.Fatalf("Got error creating channel with empty channel ID: %s", err)
}

_, err = New(nil, mocks.NewMockChannelCfg("testChannel"))
Expand Down Expand Up @@ -178,9 +175,13 @@ func isValueInList(value string, list []string) bool {
}

func setupTestChannel() (*Channel, error) {
return setupChannel("testChannel")
}

func setupChannel(channelID string) (*Channel, error) {
user := mocks.NewMockUser("test")
ctx := mocks.NewMockContext(user)
return New(ctx, mocks.NewMockChannelCfg("testChannel"))
return New(ctx, mocks.NewMockChannelCfg(channelID))
}

func setupMassiveTestChannel(numberOfPeers int, numberOfOrderers int) (*Channel, error) {
Expand Down
18 changes: 18 additions & 0 deletions pkg/fabric-client/channel/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ func TestQueryMethods(t *testing.T) {

}

func TestQueryOnSystemChannel(t *testing.T) {
channel, _ := setupChannel(systemChannel)
peer := mocks.MockPeer{MockName: "Peer1", MockURL: "http://peer1.com", MockRoles: []string{}, MockCert: nil}
err := channel.AddPeer(&peer)
if err != nil {
t.Fatalf("Error adding peer to channel: %s", err)
}

request := fab.ChaincodeInvokeRequest{
ChaincodeID: "ccID",
Fcn: "method",
Args: [][]byte{[]byte("arg")},
}
if _, err := channel.QueryByChaincode(request); err != nil {
t.Fatalf("Error invoking chaincode on system channel: %s", err)
}
}

func TestChannelQueryBlock(t *testing.T) {

channel, _ := setupTestChannel()
Expand Down
66 changes: 65 additions & 1 deletion pkg/fabric-txn/chclient/chclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/channel"
fcmocks "github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/mocks"
"github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/peer"
"github.com/hyperledger/fabric-sdk-go/pkg/fabric-txn/internal"
txnmocks "github.com/hyperledger/fabric-sdk-go/pkg/fabric-txn/mocks"
"github.com/hyperledger/fabric-sdk-go/pkg/fabric-txn/txnhandler"
pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -211,6 +213,64 @@ func TestInvokeHandler(t *testing.T) {
}
}

// customEndorsementHandler ignores the channel in the ClientContext
// and instead sends the proposal to the given channel
type customEndorsementHandler struct {
channel apifabclient.Channel
next chclient.Handler
}

func (h *customEndorsementHandler) Handle(requestContext *chclient.RequestContext, clientContext *chclient.ClientContext) {
transactionProposalResponses, txnID, err := internal.CreateAndSendTransactionProposal(h.channel,
requestContext.Request.ChaincodeID, requestContext.Request.Fcn, requestContext.Request.Args, requestContext.Opts.ProposalProcessors, requestContext.Request.TransientMap)

requestContext.Response.TransactionID = txnID

if err != nil {
requestContext.Error = err
return
}

requestContext.Response.Responses = transactionProposalResponses
if len(transactionProposalResponses) > 0 {
requestContext.Response.Payload = transactionProposalResponses[0].ProposalResponse.GetResponse().Payload
}

//Delegate to next step if any
if h.next != nil {
h.next.Handle(requestContext, clientContext)
}
}

func TestQueryWithCustomEndorser(t *testing.T) {
chClient := setupChannelClient(nil, t)

// Use the customEndorsementHandler to send the proposal to
// the system channel instead of the channel in context

systemChannel, err := setupChannel("")
if err != nil {
t.Fatalf("Error getting system channel: %s", err)
}

response, err := chClient.InvokeHandler(
txnhandler.NewProposalProcessorHandler(
&customEndorsementHandler{
channel: systemChannel,
next: txnhandler.NewEndorsementValidationHandler(),
},
),
chclient.Request{ChaincodeID: "testCC", Fcn: "invoke", Args: [][]byte{[]byte("query"), []byte("b")}},
)
if err != nil {
t.Fatalf("Failed to invoke test cc: %s", err)
}

if response.Payload != nil {
t.Fatalf("Expecting nil, got %s", response.Payload)
}
}

func TestExecuteTxDiscoveryError(t *testing.T) {
chClient := setupChannelClientWithError(errors.New("Test Error"), nil, nil, t)

Expand Down Expand Up @@ -326,8 +386,12 @@ func TestExecuteTxWithRetries(t *testing.T) {
}

func setupTestChannel() (*channel.Channel, error) {
return setupChannel("testChannel")
}

func setupChannel(channelID string) (*channel.Channel, error) {
ctx := setupTestContext()
return channel.New(ctx, fcmocks.NewMockChannelCfg("testChannel"))
return channel.New(ctx, fcmocks.NewMockChannelCfg(channelID))
}

func setupTestContext() apifabclient.Context {
Expand Down
82 changes: 53 additions & 29 deletions pkg/fabric-txn/txnhandler/txnhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,45 @@ import (
"github.com/pkg/errors"
)

//EndorseTxHandler for handling endorse transactions
type EndorseTxHandler struct {
//EndorsementHandler for handling endorse transactions
type EndorsementHandler struct {
next chclient.Handler
}

//Handle for endorsing transactions
func (e *EndorseTxHandler) Handle(requestContext *chclient.RequestContext, clientContext *chclient.ClientContext) {
func (e *EndorsementHandler) Handle(requestContext *chclient.RequestContext, clientContext *chclient.ClientContext) {
// Endorse Tx
transactionProposalResponses, txnID, err := internal.CreateAndSendTransactionProposal(clientContext.Channel,
requestContext.Request.ChaincodeID, requestContext.Request.Fcn, requestContext.Request.Args, requestContext.Opts.ProposalProcessors, requestContext.Request.TransientMap)

requestContext.Response.TransactionID = txnID

if err != nil {
requestContext.Error = err
return
}

requestContext.Response.Responses = transactionProposalResponses
if len(transactionProposalResponses) > 0 {
requestContext.Response.Payload = transactionProposalResponses[0].ProposalResponse.GetResponse().Payload
}

//Delegate to next step if any
if e.next != nil {
e.next.Handle(requestContext, clientContext)
}
}

//ProposalProcessorHandler for selecting proposal processors
type ProposalProcessorHandler struct {
next chclient.Handler
}

//Handle selects proposal processors
func (h *ProposalProcessorHandler) Handle(requestContext *chclient.RequestContext, clientContext *chclient.ClientContext) {
//Get proposal processor, if not supplied then use discovery service to get available peers as endorser
//If selection service available then get endorser peers for this chaincode
txProcessors := requestContext.Opts.ProposalProcessors
if len(txProcessors) == 0 {
if len(requestContext.Opts.ProposalProcessors) == 0 {
// Use discovery service to figure out proposal processors
peers, err := clientContext.Discovery.GetPeers()
if err != nil {
Expand All @@ -46,28 +73,12 @@ func (e *EndorseTxHandler) Handle(requestContext *chclient.RequestContext, clien
return
}
}
txProcessors = peer.PeersToTxnProcessors(endorsers)
}

// Endorse Tx
transactionProposalResponses, txnID, err := internal.CreateAndSendTransactionProposal(clientContext.Channel,
requestContext.Request.ChaincodeID, requestContext.Request.Fcn, requestContext.Request.Args, txProcessors, requestContext.Request.TransientMap)

requestContext.Response.TransactionID = txnID

if err != nil {
requestContext.Error = err
return
}

requestContext.Response.Responses = transactionProposalResponses
if len(transactionProposalResponses) > 0 {
requestContext.Response.Payload = transactionProposalResponses[0].ProposalResponse.GetResponse().Payload
requestContext.Opts.ProposalProcessors = peer.PeersToTxnProcessors(endorsers)
}

//Delegate to next step if any
if e.next != nil {
e.next.Handle(requestContext, clientContext)
if h.next != nil {
h.next.Handle(requestContext, clientContext)
}
}

Expand Down Expand Up @@ -160,17 +171,30 @@ func (c *CommitTxHandler) Handle(requestContext *chclient.RequestContext, client

//NewQueryHandler returns query handler with EndorseTxHandler & EndorsementValidationHandler Chained
func NewQueryHandler(next ...chclient.Handler) chclient.Handler {
return NewEndorseHandler(NewEndorsementValidationHandler(next...))
return NewProposalProcessorHandler(
NewEndorsementHandler(
NewEndorsementValidationHandler(next...),
),
)
}

//NewExecuteHandler returns query handler with EndorseTxHandler, EndorsementValidationHandler & CommitTxHandler Chained
func NewExecuteHandler(next ...chclient.Handler) chclient.Handler {
return NewEndorseHandler(NewEndorsementValidationHandler(NewCommitHandler(next...)))
return NewProposalProcessorHandler(
NewEndorsementHandler(
NewEndorsementValidationHandler(NewCommitHandler(next...)),
),
)
}

//NewProposalProcessorHandler returns a handler that selects proposal processors
func NewProposalProcessorHandler(next ...chclient.Handler) *ProposalProcessorHandler {
return &ProposalProcessorHandler{next: getNext(next)}
}

//NewEndorseHandler returns a handler that endorses a transaction proposal
func NewEndorseHandler(next ...chclient.Handler) *EndorseTxHandler {
return &EndorseTxHandler{next: getNext(next)}
//NewEndorsementHandler returns a handler that endorses a transaction proposal
func NewEndorsementHandler(next ...chclient.Handler) *EndorsementHandler {
return &EndorsementHandler{next: getNext(next)}
}

//NewEndorsementValidationHandler returns a handler that validates an endorsement
Expand Down
54 changes: 54 additions & 0 deletions pkg/fabric-txn/txnhandler/txnhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,60 @@ func TestQueryHandlerErrors(t *testing.T) {
}
}

func TestEndorsementHandler(t *testing.T) {
request := chclient.Request{ChaincodeID: "test", Fcn: "invoke", Args: [][]byte{[]byte("move"), []byte("a"), []byte("b"), []byte("1")}}

requestContext := prepareRequestContext(request, chclient.Opts{ProposalProcessors: []apifabclient.ProposalProcessor{fcmocks.NewMockPeer("p2", "")}}, t)
clientContext := setupChannelClientContext(nil, nil, nil, t)

handler := NewEndorsementHandler()
handler.Handle(requestContext, clientContext)
assert.Nil(t, requestContext.Error)
}

func TestProposalProcessorHandler(t *testing.T) {
peer1 := fcmocks.NewMockPeer("p1", "")
peer2 := fcmocks.NewMockPeer("p2", "")
discoveryPeers := []apifabclient.Peer{peer1, peer2}

//Get query handler
handler := NewProposalProcessorHandler()

request := chclient.Request{ChaincodeID: "testCC", Fcn: "invoke", Args: [][]byte{[]byte("query"), []byte("b")}}

selectionErr := errors.New("Some selection error")
requestContext := prepareRequestContext(request, chclient.Opts{}, t)
handler.Handle(requestContext, setupChannelClientContext(nil, selectionErr, discoveryPeers, t))
if requestContext.Error == nil || !strings.Contains(requestContext.Error.Error(), selectionErr.Error()) {
t.Fatal("Expected error: ", selectionErr, ", Received error:", requestContext.Error)
}

requestContext = prepareRequestContext(request, chclient.Opts{}, t)
handler.Handle(requestContext, setupChannelClientContext(nil, nil, discoveryPeers, t))
if requestContext.Error != nil {
t.Fatalf("Got error: %s", requestContext.Error)
}
if len(requestContext.Opts.ProposalProcessors) != len(discoveryPeers) {
t.Fatalf("Expecting %d proposal processors but got %d", len(discoveryPeers), len(requestContext.Opts.ProposalProcessors))
}
if requestContext.Opts.ProposalProcessors[0] != peer1 || requestContext.Opts.ProposalProcessors[1] != peer2 {
t.Fatalf("Didn't get expected peers")
}

// Directly pass in the proposal processors. In this case it should use those directly
requestContext = prepareRequestContext(request, chclient.Opts{ProposalProcessors: []apifabclient.ProposalProcessor{peer2}}, t)
handler.Handle(requestContext, setupChannelClientContext(nil, nil, discoveryPeers, t))
if requestContext.Error != nil {
t.Fatalf("Got error: %s", requestContext.Error)
}
if len(requestContext.Opts.ProposalProcessors) != 1 {
t.Fatalf("Expecting 1 proposal processor but got %d", len(requestContext.Opts.ProposalProcessors))
}
if requestContext.Opts.ProposalProcessors[0] != peer2 {
t.Fatalf("Didn't get expected peers")
}
}

//prepareHandlerContexts prepares context objects for handlers
func prepareRequestContext(request chclient.Request, opts chclient.Opts, t *testing.T) *chclient.RequestContext {

Expand Down
28 changes: 15 additions & 13 deletions test/integration/sdk/channel_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,21 @@ func testInvokeHandler(ccID string, chClient chclient.ChannelClient, t *testing.
txValidationCode := pb.TxValidationCode(-1)

response, err := chClient.InvokeHandler(
txnhandler.NewEndorseHandler(
txnhandler.NewEndorsementValidationHandler(
&testHandler{
t: t,
txID: &txID,
endorser: &endorser,
next: txnhandler.NewCommitHandler(
&testHandler{
t: t,
txValidationCode: &txValidationCode,
},
),
},
txnhandler.NewProposalProcessorHandler(
txnhandler.NewEndorsementHandler(
txnhandler.NewEndorsementValidationHandler(
&testHandler{
t: t,
txID: &txID,
endorser: &endorser,
next: txnhandler.NewCommitHandler(
&testHandler{
t: t,
txValidationCode: &txValidationCode,
},
),
},
),
),
),
chclient.Request{
Expand Down

0 comments on commit b8f2be7

Please sign in to comment.