Skip to content

Commit

Permalink
Retry logic for evaluate
Browse files Browse the repository at this point in the history
Modify the evaluate method to retry processing of a transaction proposal on another peer in the case of an error.

Signed-off-by: andrew-coleman <andrew_coleman@uk.ibm.com>
  • Loading branch information
andrew-coleman authored and manish-sethi committed Oct 29, 2021
1 parent 7ebb704 commit 31bc120
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 38 deletions.
41 changes: 28 additions & 13 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (gs *Server) Evaluate(ctx context.Context, request *gp.EvaluateRequest) (*g
transientProtected = true
}

endorser, err := gs.registry.evaluator(channel, chaincodeID, targetOrgs)
plan, err := gs.registry.evaluator(channel, chaincodeID, targetOrgs)
if err != nil {
if transientProtected {
return nil, status.Errorf(codes.Unavailable, "no endorsers found in the gateway's organization; retry specifying target organization(s) to protect transient data: %s", err)
Expand All @@ -64,24 +64,39 @@ func (gs *Server) Evaluate(ctx context.Context, request *gp.EvaluateRequest) (*g
ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout)
defer cancel()

response, err := endorser.client.ProcessProposal(ctx, signedProposal)
if err != nil {
logger.Debugw("Evaluate call to endorser failed", "chaincode", chaincodeID, "channel", channel, "txid", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "error", err)
return nil, wrappedRpcError(err, "failed to evaluate transaction", errorDetail(endorser.endpointConfig, err))
}
endorser := plan.endorsers()[0]
var response *peer.Response
var errDetails []proto.Message
for response == nil {
gs.logger.Debugw("Sending to peer:", "channel", channel, "chaincode", chaincodeID, "MSPID", endorser.mspid, "endpoint", endorser.address)

pr, err := endorser.client.ProcessProposal(ctx, signedProposal)
if err != nil {
logger.Debugw("Evaluate call to endorser failed", "chaincode", chaincodeID, "channel", channel, "txid", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "error", err)
errDetails = append(errDetails, errorDetail(endorser.endpointConfig, err))
gs.registry.removeEndorser(endorser)
endorser = plan.retry(endorser)
if endorser == nil {
return nil, rpcError(codes.Aborted, "failed to evaluate transaction, see attached details for more info", errDetails...)
}
}

rr := response.GetResponse()
if rr != nil && (rr.Status < 200 || rr.Status >= 400) {
logger.Debugw("Evaluate call to endorser returned a malformed or error response", "chaincode", chaincodeID, "channel", channel, "txid", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "status", rr.Status, "message", rr.Message)
err = fmt.Errorf("error %d returned from chaincode %s on channel %s: %s", rr.Status, chaincodeID, channel, rr.Message)
return nil, rpcError(codes.Unknown, "error returned from chaincode: "+rr.Message, errorDetail(endorser.endpointConfig, err))
response = pr.GetResponse()
if response != nil && (response.Status < 200 || response.Status >= 400) {
logger.Debugw("Evaluate call to endorser returned a malformed or error response", "chaincode", chaincodeID, "channel", channel, "txid", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "status", response.Status, "message", response.Message)
err = fmt.Errorf("error %d returned from chaincode %s on channel %s: %s", response.Status, chaincodeID, channel, response.Message)
endpointErr := errorDetail(endorser.endpointConfig, err)
errDetails = append(errDetails, endpointErr)
// this is a chaincode error response - don't retry
return nil, rpcError(codes.Aborted, "evaluate call to endorser returned an error response, see attached details for more info", errDetails...)
}
}

evaluateResponse := &gp.EvaluateResponse{
Result: rr,
Result: response,
}

logger.Debugw("Evaluate call to endorser returned success", "channel", request.ChannelId, "txid", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "status", rr.GetStatus(), "message", rr.GetMessage())
logger.Debugw("Evaluate call to endorser returned success", "channel", request.ChannelId, "txid", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "status", response.GetStatus(), "message", response.GetMessage())
return evaluateResponse, nil
}

Expand Down
57 changes: 55 additions & 2 deletions internal/pkg/gateway/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func TestEvaluate(t *testing.T) {
endpointDefinition: &endpointDef{
proposalError: status.Error(codes.Aborted, "wibble"),
},
errString: "rpc error: code = Aborted desc = failed to evaluate transaction: wibble",
errString: "rpc error: code = Aborted desc = failed to evaluate transaction, see attached details for more info",
errDetails: []*pb.ErrorDetail{{
Address: "localhost:7051",
MspId: "msp1",
Expand All @@ -316,13 +316,66 @@ func TestEvaluate(t *testing.T) {
proposalResponseStatus: 400,
proposalResponseMessage: "Mock chaincode error",
},
errString: "rpc error: code = Unknown desc = error returned from chaincode: Mock chaincode error",
errString: "rpc error: code = Aborted desc = evaluate call to endorser returned an error response, see attached details for more info",
errDetails: []*pb.ErrorDetail{{
Address: "peer1:8051",
MspId: "msp1",
Message: "error 400 returned from chaincode test_chaincode on channel test_channel: Mock chaincode error",
}},
},
{
name: "evaluate on local org fails - retry in other org",
members: []networkMember{
{"id1", "localhost:7051", "msp1", 4},
{"id2", "peer1:8051", "msp1", 4},
{"id3", "peer2:9051", "msp2", 3},
{"id4", "peer3:10051", "msp2", 4},
{"id5", "peer4:11051", "msp3", 5},
},
plan: endorsementPlan{
"g1": {{endorser: localhostMock, height: 4}, {endorser: peer1Mock, height: 4}}, // msp1
"g2": {{endorser: peer2Mock, height: 3}, {endorser: peer3Mock, height: 4}}, // msp2
"g3": {{endorser: peer4Mock, height: 5}}, // msp3
},
postSetup: func(t *testing.T, def *preparedTest) {
def.localEndorser.ProcessProposalReturns(nil, status.Error(codes.Aborted, "bad local endorser"))
peer1Mock.client.(*mocks.EndorserClient).ProcessProposalReturns(nil, status.Error(codes.Aborted, "bad peer1 endorser"))
},
expectedEndorsers: []string{"peer4:11051"},
},
{
name: "restrict to local org peers - which all fail",
members: []networkMember{
{"id1", "localhost:7051", "msp1", 4},
{"id2", "peer1:8051", "msp1", 4},
{"id3", "peer2:9051", "msp2", 3},
{"id4", "peer3:10051", "msp2", 4},
{"id5", "peer4:11051", "msp3", 5},
},
plan: endorsementPlan{
"g1": {{endorser: localhostMock, height: 4}, {endorser: peer1Mock, height: 4}}, // msp1
"g2": {{endorser: peer2Mock, height: 3}, {endorser: peer3Mock, height: 4}}, // msp2
"g3": {{endorser: peer4Mock, height: 5}}, // msp3
},
postSetup: func(t *testing.T, def *preparedTest) {
def.localEndorser.ProcessProposalReturns(nil, status.Error(codes.Aborted, "bad local endorser"))
peer1Mock.client.(*mocks.EndorserClient).ProcessProposalReturns(nil, status.Error(codes.Aborted, "bad peer1 endorser"))
},
endorsingOrgs: []string{"msp1"},
errString: "rpc error: code = Aborted desc = failed to evaluate transaction, see attached details for more info",
errDetails: []*pb.ErrorDetail{
{
Address: "localhost:7051",
MspId: "msp1",
Message: "rpc error: code = Aborted desc = bad local endorser",
},
{
Address: "peer1:8051",
MspId: "msp1",
Message: "rpc error: code = Aborted desc = bad peer1 endorser",
},
},
},
{
name: "dialing endorser endpoint fails",
members: []networkMember{
Expand Down
45 changes: 22 additions & 23 deletions internal/pkg/gateway/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ func (reg *registry) endorsersByOrg(channel string, chaincode string) map[string
return endorsersByOrg
}

// evaluator returns a single endorser, preferably from local org, if available
// evaluator returns a plan representing a single endorsement, preferably from local org, if available
// targetOrgs specifies the orgs that are allowed receive the request, due to private data restrictions
func (reg *registry) evaluator(channel string, chaincode string, targetOrgs []string) (*endorser, error) {
func (reg *registry) evaluator(channel string, chaincode string, targetOrgs []string) (*plan, error) {
endorsersByOrg := reg.endorsersByOrg(channel, chaincode)

// If no targetOrgs are specified (i.e. no restrictions), then populate with all available orgs
Expand All @@ -205,21 +205,29 @@ func (reg *registry) evaluator(channel string, chaincode string, targetOrgs []st
targetOrgs = append(targetOrgs, org)
}
}
// Prefer a local org endorser, if present
if e, ok := endorsersByOrg[reg.localEndorser.mspid]; ok && contains(targetOrgs, reg.localEndorser.mspid) {
return e[0].endorser, nil
}
// Otherwise highest block height peer (first in list) from another org
var evaluator *endorser
var maxHeight uint64

localOrgEndorsers := []*endorserState{}
otherOrgEndorsers := []*endorserState{}
for _, org := range targetOrgs {
if e, ok := endorsersByOrg[org]; ok && e[0].height > maxHeight {
evaluator = e[0].endorser
maxHeight = e[0].height
if es, ok := endorsersByOrg[org]; ok {
if org == reg.localEndorser.mspid {
localOrgEndorsers = es
} else {
otherOrgEndorsers = append(otherOrgEndorsers, es...)
}
}
}
if evaluator != nil {
return evaluator, nil
// sort all the 'other orgs' endorsers by decreasing block height
sort.Slice(otherOrgEndorsers, sorter(otherOrgEndorsers, ""))

var allEndorsers []*endorser
for _, e := range append(localOrgEndorsers, otherOrgEndorsers...) {
allEndorsers = append(allEndorsers, e.endorser)
}
if len(allEndorsers) > 0 {
layout := []*layout{{required: map[string]int{"g1": 1}}} // single layout, one group, one endorsement
groupEndorsers := map[string][]*endorser{"g1": allEndorsers}
return newPlan(layout, groupEndorsers), nil
}
return nil, fmt.Errorf("no endorsing peers found for chaincode %s in channel %s", chaincode, channel)
}
Expand All @@ -234,15 +242,6 @@ func sorter(e []*endorserState, host string) func(i, j int) bool {
}
}

func contains(slice []string, entry string) bool {
for _, item := range slice {
if entry == item {
return true
}
}
return false
}

// Returns a set of broadcastClients that can order a transaction for the given channel.
func (reg *registry) orderers(channel string) ([]*orderer, error) {
var orderers []*orderer
Expand Down

0 comments on commit 31bc120

Please sign in to comment.