From fe06786f3c4c4320f7588ae03f267ad3507c57da Mon Sep 17 00:00:00 2001 From: yacovm Date: Sun, 9 Jul 2017 23:43:45 +0300 Subject: [PATCH] [FAB-5520] Make Broadcast select a random orderer The implementation of SendTransaction sends the transaction to all ordering service nodes. There is no reason to send the same transaction to all ordering service nodes. This change set makes a random ordering service instance be selected instead of sending to everyone. In case the selected (random) orderer returns a bad status, the code now tries to contact other endpoints until it succeeds or all have been tried. Change-Id: Idf5c8be69c1f90c62c7ebc164531fa740e670843 Signed-off-by: yacovm --- api/apitxn/sender.go | 2 +- pkg/fabric-client/channel/txnsender.go | 79 +++++++++++--------- pkg/fabric-client/channel/txnsender_test.go | 83 ++++++++++++++++++--- pkg/fabric-txn/internal/common.go | 12 ++- test/integration/base_test_setup.go | 18 ++--- 5 files changed, 130 insertions(+), 64 deletions(-) diff --git a/api/apitxn/sender.go b/api/apitxn/sender.go index 353bb39776..e32cf8d311 100644 --- a/api/apitxn/sender.go +++ b/api/apitxn/sender.go @@ -14,7 +14,7 @@ import ( // Sender provides the ability for a transaction to be created and sent. type Sender interface { CreateTransaction(resps []*TransactionProposalResponse) (*Transaction, error) - SendTransaction(tx *Transaction) ([]*TransactionResponse, error) + SendTransaction(tx *Transaction) (*TransactionResponse, error) } // The Transaction object created from an endorsed proposal. diff --git a/pkg/fabric-client/channel/txnsender.go b/pkg/fabric-client/channel/txnsender.go index 0d1a3da505..c58e68ad85 100644 --- a/pkg/fabric-client/channel/txnsender.go +++ b/pkg/fabric-client/channel/txnsender.go @@ -8,24 +8,27 @@ package channel import ( "fmt" + "math/rand" "sync" "time" "github.com/golang/protobuf/proto" proto_ts "github.com/golang/protobuf/ptypes/timestamp" - "github.com/hyperledger/fabric/protos/common" - - "github.com/hyperledger/fabric/bccsp" - mspprotos "github.com/hyperledger/fabric/protos/msp" - pb "github.com/hyperledger/fabric/protos/peer" - protos_utils "github.com/hyperledger/fabric/protos/utils" - fab "github.com/hyperledger/fabric-sdk-go/api/apifabclient" "github.com/hyperledger/fabric-sdk-go/api/apitxn" fc "github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/internal" "github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/internal/txnproc" + "github.com/hyperledger/fabric/bccsp" + "github.com/hyperledger/fabric/protos/common" + mspprotos "github.com/hyperledger/fabric/protos/msp" + pb "github.com/hyperledger/fabric/protos/peer" + protos_utils "github.com/hyperledger/fabric/protos/utils" ) +func init() { + rand.Seed(time.Now().UnixNano()) +} + // CreateTransaction create a transaction with proposal response, following the endorsement policy. func (c *Channel) CreateTransaction(resps []*apitxn.TransactionProposalResponse) (*apitxn.Transaction, error) { if len(resps) == 0 { @@ -109,7 +112,7 @@ func (c *Channel) CreateTransaction(resps []*apitxn.TransactionProposalResponse) } // SendTransaction send a transaction to the chain’s orderer service (one or more orderer endpoints) for consensus and committing to the ledger. -func (c *Channel) SendTransaction(tx *apitxn.Transaction) ([]*apitxn.TransactionResponse, error) { +func (c *Channel) SendTransaction(tx *apitxn.Transaction) (*apitxn.TransactionResponse, error) { if c.orderers == nil || len(c.orderers) == 0 { return nil, fmt.Errorf("orderers is nil") } @@ -144,12 +147,12 @@ func (c *Channel) SendTransaction(tx *apitxn.Transaction) ([]*apitxn.Transaction return nil, err } - transactionResponses, err := c.BroadcastEnvelope(envelope) + transactionResponse, err := c.BroadcastEnvelope(envelope) if err != nil { return nil, err } - return transactionResponses, nil + return transactionResponse, nil } // SendInstantiateProposal sends an instantiate proposal to one or more endorsing peers. @@ -239,41 +242,43 @@ func (c *Channel) SignPayload(payload []byte) (*fab.SignedEnvelope, error) { return &fab.SignedEnvelope{Payload: payload, Signature: signature}, nil } -// BroadcastEnvelope will send the given envelope to each orderer -func (c *Channel) BroadcastEnvelope(envelope *fab.SignedEnvelope) ([]*apitxn.TransactionResponse, error) { +// BroadcastEnvelope will send the given envelope to some orderer, picking random endpoints +// until all are exhausted +func (c *Channel) BroadcastEnvelope(envelope *fab.SignedEnvelope) (*apitxn.TransactionResponse, error) { // Check if orderers are defined - if c.orderers == nil || len(c.orderers) == 0 { + if len(c.orderers) == 0 { return nil, fmt.Errorf("orderers not set") } - var responseMtx sync.Mutex - var transactionResponses []*apitxn.TransactionResponse - var wg sync.WaitGroup - + // Copy aside the ordering service endpoints + orderers := []fab.Orderer{} for _, o := range c.orderers { - wg.Add(1) - go func(orderer fab.Orderer) { - defer wg.Done() - var transactionResponse *apitxn.TransactionResponse - - logger.Debugf("Broadcasting envelope to orderer :%s\n", orderer.URL()) - if _, err := orderer.SendBroadcast(envelope); err != nil { - logger.Debugf("Receive Error Response from orderer :%v\n", err) - transactionResponse = &apitxn.TransactionResponse{Orderer: orderer.URL(), - Err: fmt.Errorf("Error calling orderer '%s': %s", orderer.URL(), err)} - } else { - logger.Debugf("Receive Success Response from orderer\n") - transactionResponse = &apitxn.TransactionResponse{Orderer: orderer.URL(), Err: nil} - } + orderers = append(orderers, o) + } - responseMtx.Lock() - transactionResponses = append(transactionResponses, transactionResponse) - responseMtx.Unlock() - }(o) + // Iterate them in a random order and try broadcasting 1 by 1 + var errResp *apitxn.TransactionResponse + for _, i := range rand.Perm(len(orderers)) { + resp := c.sendBroadcast(envelope, orderers[i]) + if resp.Err != nil { + errResp = resp + } else { + return resp, nil + } } - wg.Wait() + return errResp, nil +} - return transactionResponses, nil +func (c *Channel) sendBroadcast(envelope *fab.SignedEnvelope, orderer fab.Orderer) *apitxn.TransactionResponse { + logger.Debugf("Broadcasting envelope to orderer :%s\n", orderer.URL()) + if _, err := orderer.SendBroadcast(envelope); err != nil { + logger.Debugf("Receive Error Response from orderer :%v\n", err) + return &apitxn.TransactionResponse{Orderer: orderer.URL(), + Err: fmt.Errorf("Error calling orderer '%s': %s", orderer.URL(), err)} + } else { + logger.Debugf("Receive Success Response from orderer\n") + return &apitxn.TransactionResponse{Orderer: orderer.URL(), Err: nil} + } } // SendEnvelope sends the given envelope to each orderer and returns a block response diff --git a/pkg/fabric-client/channel/txnsender_test.go b/pkg/fabric-client/channel/txnsender_test.go index d89334e007..03213fc459 100644 --- a/pkg/fabric-client/channel/txnsender_test.go +++ b/pkg/fabric-client/channel/txnsender_test.go @@ -7,8 +7,11 @@ SPDX-License-Identifier: Apache-2.0 package channel import ( + "crypto/rand" + "fmt" "os" "strconv" + "strings" "testing" "time" @@ -181,19 +184,33 @@ func TestSendInstantiateProposal(t *testing.T) { if err == nil || err.Error() != "Missing peer objects for instantiate CC proposal" { t.Fatal("Missing peer objects validation is not working as expected") } +} +type mockReader struct { + err error } -func TestBroadcastEnvelope(t *testing.T) { +func (r *mockReader) Read(p []byte) (int, error) { + if r.err != nil { + return 0, r.err + } + n, _ := rand.Read(p) + return n, nil +} +func TestBroadcastEnvelope(t *testing.T) { //Setup channel channel, _ := setupTestChannel() - //Create mock orderer - orderer := mocks.NewMockOrderer("", nil) + lsnr1 := make(chan *fab.SignedEnvelope) + lsnr2 := make(chan *fab.SignedEnvelope) + //Create mock orderers + orderer1 := mocks.NewMockOrderer("1", lsnr1) + orderer2 := mocks.NewMockOrderer("2", lsnr2) - //Add an orderer - channel.AddOrderer(orderer) + //Add the orderers + channel.AddOrderer(orderer1) + channel.AddOrderer(orderer2) peer := mocks.MockPeer{MockName: "Peer1", MockURL: "http://peer1.com", MockRoles: []string{}, MockCert: nil} channel.AddPeer(&peer) @@ -204,17 +221,65 @@ func TestBroadcastEnvelope(t *testing.T) { } res, err := channel.BroadcastEnvelope(sigEnvelope) - if err != nil || res == nil { - t.Fatalf("Test Broadcast Envelope Failed, cause %s", err.Error()) + if err != nil || res.Err != nil { + t.Fatalf("Test Broadcast Envelope Failed, cause %v %v", err, res) + } + + // Ensure only 1 orderer was selected for broadcast + firstSelected := 0 + secondSelected := 0 + for i := 0; i < 2; i++ { + select { + case <-lsnr1: + firstSelected = 1 + case <-lsnr2: + secondSelected = 1 + case <-time.After(time.Second): + } } - channel.RemoveOrderer(orderer) + if firstSelected+secondSelected != 1 { + t.Fatal("Both or none orderers were selected for broadcast:", firstSelected+secondSelected) + } + + // Now make 1 of them fail and repeatedly broadcast + broadcastCount := 50 + for i := 0; i < broadcastCount; i++ { + orderer1.(mocks.MockOrderer).EnqueueSendBroadcastError(fmt.Errorf("Service Unavailable")) + } + // It should always succeed even though one of them has failed + for i := 0; i < broadcastCount; i++ { + if res, err := channel.BroadcastEnvelope(sigEnvelope); err != nil || res.Err != nil { + t.Fatalf("Test Broadcast Envelope Failed, cause %v %v", err, res) + } + } + + // Now, fail both and ensure any attempt fails + for i := 0; i < broadcastCount; i++ { + orderer1.(mocks.MockOrderer).EnqueueSendBroadcastError(fmt.Errorf("Service Unavailable")) + orderer2.(mocks.MockOrderer).EnqueueSendBroadcastError(fmt.Errorf("Service Unavailable")) + } + + for i := 0; i < broadcastCount; i++ { + res, err := channel.BroadcastEnvelope(sigEnvelope) + if err != nil { + t.Fatalf("Test Broadcast sending failed, cause %v", err) + } + if res.Err == nil { + t.Fatal("Test Broadcast succeeded, but it should have failed") + } + if !strings.Contains(res.Err.Error(), "Service Unavailable") { + t.Fatal("Test Broadcast failed but didn't return the correct reason(should contain 'Service Unavailable')") + } + } + + channel.RemoveOrderer(orderer1) + channel.RemoveOrderer(orderer2) _, err = channel.BroadcastEnvelope(sigEnvelope) if err == nil || err.Error() != "orderers not set" { t.Fatal("orderers not set validation on broadcast envelope is not working as expected") } - } func TestSendTransaction(t *testing.T) { diff --git a/pkg/fabric-txn/internal/common.go b/pkg/fabric-txn/internal/common.go index dc81efb4c6..4ad206bfe7 100644 --- a/pkg/fabric-txn/internal/common.go +++ b/pkg/fabric-txn/internal/common.go @@ -44,25 +44,23 @@ func CreateAndSendTransactionProposal(sender apitxn.ProposalSender, chainCodeID } // CreateAndSendTransaction ... -func CreateAndSendTransaction(sender apitxn.Sender, resps []*apitxn.TransactionProposalResponse) ([]*apitxn.TransactionResponse, error) { +func CreateAndSendTransaction(sender apitxn.Sender, resps []*apitxn.TransactionProposalResponse) (*apitxn.TransactionResponse, error) { tx, err := sender.CreateTransaction(resps) if err != nil { return nil, fmt.Errorf("CreateTransaction returned error: %v", err) } - transactionResponses, err := sender.SendTransaction(tx) + transactionResponse, err := sender.SendTransaction(tx) if err != nil { return nil, fmt.Errorf("SendTransaction returned error: %v", err) } - for _, v := range transactionResponses { - if v.Err != nil { - return nil, fmt.Errorf("Orderer %s returned error: %v", v.Orderer, v.Err) - } + if transactionResponse.Err != nil { + return nil, fmt.Errorf("Orderer %s returned error: %v", transactionResponse.Orderer, transactionResponse.Err) } - return transactionResponses, nil + return transactionResponse, nil } // RegisterTxEvent registers on the given eventhub for the given transaction id diff --git a/test/integration/base_test_setup.go b/test/integration/base_test_setup.go index 792533697d..f813f1274a 100644 --- a/test/integration/base_test_setup.go +++ b/test/integration/base_test_setup.go @@ -12,15 +12,14 @@ import ( "path" "time" - "github.com/hyperledger/fabric-sdk-go/api/apitxn" - "github.com/hyperledger/fabric-sdk-go/pkg/config" - "github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/events" - "github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/orderer" - "github.com/hyperledger/fabric-sdk-go/api/apiconfig" ca "github.com/hyperledger/fabric-sdk-go/api/apifabca" fab "github.com/hyperledger/fabric-sdk-go/api/apifabclient" + "github.com/hyperledger/fabric-sdk-go/api/apitxn" deffab "github.com/hyperledger/fabric-sdk-go/def/fabapi" + "github.com/hyperledger/fabric-sdk-go/pkg/config" + "github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/events" + "github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/orderer" fabricTxn "github.com/hyperledger/fabric-sdk-go/pkg/fabric-txn" admin "github.com/hyperledger/fabric-sdk-go/pkg/fabric-txn/admin" bccspFactory "github.com/hyperledger/fabric/bccsp/factory" @@ -299,7 +298,7 @@ func (setup *BaseSetupImpl) CreateAndSendTransactionProposal(channel fab.Channel } // CreateAndSendTransaction ... -func (setup *BaseSetupImpl) CreateAndSendTransaction(channel fab.Channel, resps []*apitxn.TransactionProposalResponse) ([]*apitxn.TransactionResponse, error) { +func (setup *BaseSetupImpl) CreateAndSendTransaction(channel fab.Channel, resps []*apitxn.TransactionProposalResponse) (*apitxn.TransactionResponse, error) { tx, err := channel.CreateTransaction(resps) if err != nil { @@ -311,10 +310,9 @@ func (setup *BaseSetupImpl) CreateAndSendTransaction(channel fab.Channel, resps return nil, fmt.Errorf("SendTransaction return error: %v", err) } - for _, v := range transactionResponse { - if v.Err != nil { - return nil, fmt.Errorf("Orderer %s return error: %v", v.Orderer, v.Err) - } + + if transactionResponse.Err != nil { + return nil, fmt.Errorf("Orderer %s return error: %v", transactionResponse.Orderer, transactionResponse.Err) } return transactionResponse, nil