Skip to content

Commit

Permalink
[FAB-5520] Make Broadcast select a random orderer
Browse files Browse the repository at this point in the history
The implementation of SendTransaction sends the transaction
to all ordering service nodes.
There is no reason to send the same transaction to all ordering service
nodes.

This change set makes a random ordering service instance be selected
instead of sending to everyone.
In case the selected (random) orderer returns a bad status, the code now
tries to contact other endpoints until it succeeds or all have been tried.

Change-Id: Idf5c8be69c1f90c62c7ebc164531fa740e670843
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Jul 10, 2017
1 parent 223e728 commit fe06786
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 64 deletions.
2 changes: 1 addition & 1 deletion api/apitxn/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// Sender provides the ability for a transaction to be created and sent.
type Sender interface {
CreateTransaction(resps []*TransactionProposalResponse) (*Transaction, error)
SendTransaction(tx *Transaction) ([]*TransactionResponse, error)
SendTransaction(tx *Transaction) (*TransactionResponse, error)
}

// The Transaction object created from an endorsed proposal.
Expand Down
79 changes: 42 additions & 37 deletions pkg/fabric-client/channel/txnsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,27 @@ package channel

import (
"fmt"
"math/rand"
"sync"
"time"

"github.com/golang/protobuf/proto"
proto_ts "github.com/golang/protobuf/ptypes/timestamp"
"github.com/hyperledger/fabric/protos/common"

"github.com/hyperledger/fabric/bccsp"
mspprotos "github.com/hyperledger/fabric/protos/msp"
pb "github.com/hyperledger/fabric/protos/peer"
protos_utils "github.com/hyperledger/fabric/protos/utils"

fab "github.com/hyperledger/fabric-sdk-go/api/apifabclient"
"github.com/hyperledger/fabric-sdk-go/api/apitxn"
fc "github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/internal"
"github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/internal/txnproc"
"github.com/hyperledger/fabric/bccsp"
"github.com/hyperledger/fabric/protos/common"
mspprotos "github.com/hyperledger/fabric/protos/msp"
pb "github.com/hyperledger/fabric/protos/peer"
protos_utils "github.com/hyperledger/fabric/protos/utils"
)

func init() {
rand.Seed(time.Now().UnixNano())
}

// CreateTransaction create a transaction with proposal response, following the endorsement policy.
func (c *Channel) CreateTransaction(resps []*apitxn.TransactionProposalResponse) (*apitxn.Transaction, error) {
if len(resps) == 0 {
Expand Down Expand Up @@ -109,7 +112,7 @@ func (c *Channel) CreateTransaction(resps []*apitxn.TransactionProposalResponse)
}

// SendTransaction send a transaction to the chain’s orderer service (one or more orderer endpoints) for consensus and committing to the ledger.
func (c *Channel) SendTransaction(tx *apitxn.Transaction) ([]*apitxn.TransactionResponse, error) {
func (c *Channel) SendTransaction(tx *apitxn.Transaction) (*apitxn.TransactionResponse, error) {
if c.orderers == nil || len(c.orderers) == 0 {
return nil, fmt.Errorf("orderers is nil")
}
Expand Down Expand Up @@ -144,12 +147,12 @@ func (c *Channel) SendTransaction(tx *apitxn.Transaction) ([]*apitxn.Transaction
return nil, err
}

transactionResponses, err := c.BroadcastEnvelope(envelope)
transactionResponse, err := c.BroadcastEnvelope(envelope)
if err != nil {
return nil, err
}

return transactionResponses, nil
return transactionResponse, nil
}

// SendInstantiateProposal sends an instantiate proposal to one or more endorsing peers.
Expand Down Expand Up @@ -239,41 +242,43 @@ func (c *Channel) SignPayload(payload []byte) (*fab.SignedEnvelope, error) {
return &fab.SignedEnvelope{Payload: payload, Signature: signature}, nil
}

// BroadcastEnvelope will send the given envelope to each orderer
func (c *Channel) BroadcastEnvelope(envelope *fab.SignedEnvelope) ([]*apitxn.TransactionResponse, error) {
// BroadcastEnvelope will send the given envelope to some orderer, picking random endpoints
// until all are exhausted
func (c *Channel) BroadcastEnvelope(envelope *fab.SignedEnvelope) (*apitxn.TransactionResponse, error) {
// Check if orderers are defined
if c.orderers == nil || len(c.orderers) == 0 {
if len(c.orderers) == 0 {
return nil, fmt.Errorf("orderers not set")
}

var responseMtx sync.Mutex
var transactionResponses []*apitxn.TransactionResponse
var wg sync.WaitGroup

// Copy aside the ordering service endpoints
orderers := []fab.Orderer{}
for _, o := range c.orderers {
wg.Add(1)
go func(orderer fab.Orderer) {
defer wg.Done()
var transactionResponse *apitxn.TransactionResponse

logger.Debugf("Broadcasting envelope to orderer :%s\n", orderer.URL())
if _, err := orderer.SendBroadcast(envelope); err != nil {
logger.Debugf("Receive Error Response from orderer :%v\n", err)
transactionResponse = &apitxn.TransactionResponse{Orderer: orderer.URL(),
Err: fmt.Errorf("Error calling orderer '%s': %s", orderer.URL(), err)}
} else {
logger.Debugf("Receive Success Response from orderer\n")
transactionResponse = &apitxn.TransactionResponse{Orderer: orderer.URL(), Err: nil}
}
orderers = append(orderers, o)
}

responseMtx.Lock()
transactionResponses = append(transactionResponses, transactionResponse)
responseMtx.Unlock()
}(o)
// Iterate them in a random order and try broadcasting 1 by 1
var errResp *apitxn.TransactionResponse
for _, i := range rand.Perm(len(orderers)) {
resp := c.sendBroadcast(envelope, orderers[i])
if resp.Err != nil {
errResp = resp
} else {
return resp, nil
}
}
wg.Wait()
return errResp, nil
}

return transactionResponses, nil
func (c *Channel) sendBroadcast(envelope *fab.SignedEnvelope, orderer fab.Orderer) *apitxn.TransactionResponse {
logger.Debugf("Broadcasting envelope to orderer :%s\n", orderer.URL())
if _, err := orderer.SendBroadcast(envelope); err != nil {
logger.Debugf("Receive Error Response from orderer :%v\n", err)
return &apitxn.TransactionResponse{Orderer: orderer.URL(),
Err: fmt.Errorf("Error calling orderer '%s': %s", orderer.URL(), err)}
} else {
logger.Debugf("Receive Success Response from orderer\n")
return &apitxn.TransactionResponse{Orderer: orderer.URL(), Err: nil}
}
}

// SendEnvelope sends the given envelope to each orderer and returns a block response
Expand Down
83 changes: 74 additions & 9 deletions pkg/fabric-client/channel/txnsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ SPDX-License-Identifier: Apache-2.0
package channel

import (
"crypto/rand"
"fmt"
"os"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -181,19 +184,33 @@ func TestSendInstantiateProposal(t *testing.T) {
if err == nil || err.Error() != "Missing peer objects for instantiate CC proposal" {
t.Fatal("Missing peer objects validation is not working as expected")
}
}

type mockReader struct {
err error
}

func TestBroadcastEnvelope(t *testing.T) {
func (r *mockReader) Read(p []byte) (int, error) {
if r.err != nil {
return 0, r.err
}
n, _ := rand.Read(p)
return n, nil
}

func TestBroadcastEnvelope(t *testing.T) {
//Setup channel
channel, _ := setupTestChannel()

//Create mock orderer
orderer := mocks.NewMockOrderer("", nil)
lsnr1 := make(chan *fab.SignedEnvelope)
lsnr2 := make(chan *fab.SignedEnvelope)
//Create mock orderers
orderer1 := mocks.NewMockOrderer("1", lsnr1)
orderer2 := mocks.NewMockOrderer("2", lsnr2)

//Add an orderer
channel.AddOrderer(orderer)
//Add the orderers
channel.AddOrderer(orderer1)
channel.AddOrderer(orderer2)

peer := mocks.MockPeer{MockName: "Peer1", MockURL: "http://peer1.com", MockRoles: []string{}, MockCert: nil}
channel.AddPeer(&peer)
Expand All @@ -204,17 +221,65 @@ func TestBroadcastEnvelope(t *testing.T) {
}
res, err := channel.BroadcastEnvelope(sigEnvelope)

if err != nil || res == nil {
t.Fatalf("Test Broadcast Envelope Failed, cause %s", err.Error())
if err != nil || res.Err != nil {
t.Fatalf("Test Broadcast Envelope Failed, cause %v %v", err, res)
}

// Ensure only 1 orderer was selected for broadcast
firstSelected := 0
secondSelected := 0
for i := 0; i < 2; i++ {
select {
case <-lsnr1:
firstSelected = 1
case <-lsnr2:
secondSelected = 1
case <-time.After(time.Second):
}
}

channel.RemoveOrderer(orderer)
if firstSelected+secondSelected != 1 {
t.Fatal("Both or none orderers were selected for broadcast:", firstSelected+secondSelected)
}

// Now make 1 of them fail and repeatedly broadcast
broadcastCount := 50
for i := 0; i < broadcastCount; i++ {
orderer1.(mocks.MockOrderer).EnqueueSendBroadcastError(fmt.Errorf("Service Unavailable"))
}
// It should always succeed even though one of them has failed
for i := 0; i < broadcastCount; i++ {
if res, err := channel.BroadcastEnvelope(sigEnvelope); err != nil || res.Err != nil {
t.Fatalf("Test Broadcast Envelope Failed, cause %v %v", err, res)
}
}

// Now, fail both and ensure any attempt fails
for i := 0; i < broadcastCount; i++ {
orderer1.(mocks.MockOrderer).EnqueueSendBroadcastError(fmt.Errorf("Service Unavailable"))
orderer2.(mocks.MockOrderer).EnqueueSendBroadcastError(fmt.Errorf("Service Unavailable"))
}

for i := 0; i < broadcastCount; i++ {
res, err := channel.BroadcastEnvelope(sigEnvelope)
if err != nil {
t.Fatalf("Test Broadcast sending failed, cause %v", err)
}
if res.Err == nil {
t.Fatal("Test Broadcast succeeded, but it should have failed")
}
if !strings.Contains(res.Err.Error(), "Service Unavailable") {
t.Fatal("Test Broadcast failed but didn't return the correct reason(should contain 'Service Unavailable')")
}
}

channel.RemoveOrderer(orderer1)
channel.RemoveOrderer(orderer2)
_, err = channel.BroadcastEnvelope(sigEnvelope)

if err == nil || err.Error() != "orderers not set" {
t.Fatal("orderers not set validation on broadcast envelope is not working as expected")
}

}

func TestSendTransaction(t *testing.T) {
Expand Down
12 changes: 5 additions & 7 deletions pkg/fabric-txn/internal/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,23 @@ func CreateAndSendTransactionProposal(sender apitxn.ProposalSender, chainCodeID
}

// CreateAndSendTransaction ...
func CreateAndSendTransaction(sender apitxn.Sender, resps []*apitxn.TransactionProposalResponse) ([]*apitxn.TransactionResponse, error) {
func CreateAndSendTransaction(sender apitxn.Sender, resps []*apitxn.TransactionProposalResponse) (*apitxn.TransactionResponse, error) {

tx, err := sender.CreateTransaction(resps)
if err != nil {
return nil, fmt.Errorf("CreateTransaction returned error: %v", err)
}

transactionResponses, err := sender.SendTransaction(tx)
transactionResponse, err := sender.SendTransaction(tx)
if err != nil {
return nil, fmt.Errorf("SendTransaction returned error: %v", err)

}
for _, v := range transactionResponses {
if v.Err != nil {
return nil, fmt.Errorf("Orderer %s returned error: %v", v.Orderer, v.Err)
}
if transactionResponse.Err != nil {
return nil, fmt.Errorf("Orderer %s returned error: %v", transactionResponse.Orderer, transactionResponse.Err)
}

return transactionResponses, nil
return transactionResponse, nil
}

// RegisterTxEvent registers on the given eventhub for the given transaction id
Expand Down
18 changes: 8 additions & 10 deletions test/integration/base_test_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ import (
"path"
"time"

"github.com/hyperledger/fabric-sdk-go/api/apitxn"
"github.com/hyperledger/fabric-sdk-go/pkg/config"
"github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/events"
"github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/orderer"

"github.com/hyperledger/fabric-sdk-go/api/apiconfig"
ca "github.com/hyperledger/fabric-sdk-go/api/apifabca"
fab "github.com/hyperledger/fabric-sdk-go/api/apifabclient"
"github.com/hyperledger/fabric-sdk-go/api/apitxn"
deffab "github.com/hyperledger/fabric-sdk-go/def/fabapi"
"github.com/hyperledger/fabric-sdk-go/pkg/config"
"github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/events"
"github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/orderer"
fabricTxn "github.com/hyperledger/fabric-sdk-go/pkg/fabric-txn"
admin "github.com/hyperledger/fabric-sdk-go/pkg/fabric-txn/admin"
bccspFactory "github.com/hyperledger/fabric/bccsp/factory"
Expand Down Expand Up @@ -299,7 +298,7 @@ func (setup *BaseSetupImpl) CreateAndSendTransactionProposal(channel fab.Channel
}

// CreateAndSendTransaction ...
func (setup *BaseSetupImpl) CreateAndSendTransaction(channel fab.Channel, resps []*apitxn.TransactionProposalResponse) ([]*apitxn.TransactionResponse, error) {
func (setup *BaseSetupImpl) CreateAndSendTransaction(channel fab.Channel, resps []*apitxn.TransactionProposalResponse) (*apitxn.TransactionResponse, error) {

tx, err := channel.CreateTransaction(resps)
if err != nil {
Expand All @@ -311,10 +310,9 @@ func (setup *BaseSetupImpl) CreateAndSendTransaction(channel fab.Channel, resps
return nil, fmt.Errorf("SendTransaction return error: %v", err)

}
for _, v := range transactionResponse {
if v.Err != nil {
return nil, fmt.Errorf("Orderer %s return error: %v", v.Orderer, v.Err)
}

if transactionResponse.Err != nil {
return nil, fmt.Errorf("Orderer %s return error: %v", transactionResponse.Orderer, transactionResponse.Err)
}

return transactionResponse, nil
Expand Down

0 comments on commit fe06786

Please sign in to comment.