Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine Gateway gRPC error status codes #3075

Merged
merged 1 commit into from
Nov 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 45 additions & 36 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package gateway
import (
"context"
"fmt"
"io"
"math/rand"
"strings"
"sync"
Expand Down Expand Up @@ -59,9 +60,9 @@ func (gs *Server) Evaluate(ctx context.Context, request *gp.EvaluateRequest) (*g
plan, err := gs.registry.evaluator(channel, chaincodeID, targetOrgs)
if err != nil {
if transientProtected {
return nil, status.Errorf(codes.Unavailable, "no endorsers found in the gateway's organization; retry specifying target organization(s) to protect transient data: %s", err)
return nil, status.Errorf(codes.FailedPrecondition, "no endorsers found in the gateway's organization; retry specifying target organization(s) to protect transient data: %s", err)
}
return nil, status.Errorf(codes.Unavailable, "%s", err)
return nil, status.Errorf(codes.FailedPrecondition, "%s", err)
}

endorser := plan.endorsers()[0]
Expand All @@ -76,8 +77,8 @@ func (gs *Server) Evaluate(ctx context.Context, request *gp.EvaluateRequest) (*g
ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout)
defer cancel()
pr, err := endorser.client.ProcessProposal(ctx, signedProposal)
success, message, retry, remove := gs.responseStatus(pr, err)
if success {
code, message, retry, remove := gs.responseStatus(pr, err)
if code == codes.OK {
response = pr.Response
// Prefer result from proposal response as Response.Payload is not required to be transaction result
if result, err := getResultFromProposalResponse(pr); err == nil {
Expand All @@ -94,10 +95,10 @@ func (gs *Server) Evaluate(ctx context.Context, request *gp.EvaluateRequest) (*g
if retry {
endorser = plan.nextPeerInGroup(endorser)
} else {
done <- newRpcError(codes.Aborted, "evaluate call to endorser returned error: "+message, errDetails...)
done <- newRpcError(code, "evaluate call to endorser returned error: "+message, errDetails...)
}
if endorser == nil {
done <- newRpcError(codes.Aborted, "failed to evaluate transaction, see attached details for more info", errDetails...)
done <- newRpcError(code, "failed to evaluate transaction, see attached details for more info", errDetails...)
}
}
}()
Expand Down Expand Up @@ -167,7 +168,7 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
// Otherwise, just let discovery pick one.
plan, err = gs.registry.endorsementPlan(channel, defaultInterest, nil)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
return nil, status.Errorf(codes.FailedPrecondition, "%s", err)
}
}
firstEndorser := plan.endorsers()[0]
Expand All @@ -185,9 +186,9 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout)
defer cancel()
firstResponse, err = firstEndorser.client.ProcessProposal(ctx, signedProposal)
success, message, _, remove := gs.responseStatus(firstResponse, err)
code, message, _, remove := gs.responseStatus(firstResponse, err)

if !success {
if code != codes.OK {
logger.Warnw("Endorse call to endorser failed", "channel", request.ChannelId, "txID", request.TransactionId, "endorserAddress", firstEndorser.endpointConfig.address, "endorserMspid", firstEndorser.endpointConfig.mspid, "error", message)
errDetails = append(errDetails, errorDetail(firstEndorser.endpointConfig, message))
if remove {
Expand Down Expand Up @@ -229,7 +230,7 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
// The preferred discovery layout will contain the firstEndorser's Org.
plan, err = gs.registry.endorsementPlan(channel, interest, firstEndorser)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
return nil, status.Errorf(codes.FailedPrecondition, "%s", err)
}

// 6. Remove the gateway org's endorser, since we've already done that
Expand Down Expand Up @@ -262,8 +263,8 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
// Ignore the retry flag returned by the following responseStatus call. Endorse will retry until all endorsement layouts have been exhausted.
// It tries to get a successful endorsement from each org and minimise the changes of a rogue peer scuppering the transaction.
// If an org is behaving badly, it can move on to a different layout.
success, message, _, remove := gs.responseStatus(response, err)
if success {
code, message, _, remove := gs.responseStatus(response, err)
if code == codes.OK {
logger.Debugw("Endorse call to endorser returned success", "channel", request.ChannelId, "txID", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "status", response.Response.Status, "message", response.Response.Message)

responseMessage := response.GetResponse()
Expand Down Expand Up @@ -335,37 +336,37 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
// determines how the gateway should react (retry?, close connection?).
// Uses the grpc canonical status error codes and their recommended actions.
// Returns:
// - response successful (bool)
// - response status code, with codes.OK indicating success and other values indicating likely error type
// - error message extracted from the err or generated from 500 proposal response (string)
// - should the gateway retry (only the Evaluate() uses this) (bool)
// - should the gateway close the connection and remove the peer from its registry (bool)
func (gs *Server) responseStatus(response *peer.ProposalResponse, err error) (success bool, message string, retry bool, remove bool) {
func (gs *Server) responseStatus(response *peer.ProposalResponse, err error) (statusCode codes.Code, message string, retry bool, remove bool) {
if err != nil {
if response == nil {
// there is no ProposalResponse, so this must have been generated by grpc in response to an unavailable peer
// - close the connection and retry on another
return false, err.Error(), true, true
return codes.Unavailable, err.Error(), true, true
}
// there is a response and an err, so it must have been from the unpackProposal() or preProcess() stages
// preProcess does all the signature and ACL checking. In either case, no point retrying, or closing the connection (it's a client error)
return false, err.Error(), false, false
return codes.FailedPrecondition, err.Error(), false, false
}
if response.Response.Status < 200 || response.Response.Status >= 400 {
if response.Payload == nil && response.Response.Status == 500 {
// there's a error 500 response but no payload, so the response was generated in the peer rather than the chaincode
if strings.HasSuffix(response.Response.Message, chaincode.ErrorStreamTerminated) {
// chaincode container crashed probably. Close connection and retry on another peer
return false, response.Response.Message, true, true
return codes.Aborted, response.Response.Message, true, true
}
// some other error - retry on another peer
return false, response.Response.Message, true, false
return codes.Aborted, response.Response.Message, true, false
} else {
// otherwise it must be an error response generated by the chaincode
return false, fmt.Sprintf("chaincode response %d, %s", response.Response.Status, response.Response.Message), false, false
return codes.Unknown, fmt.Sprintf("chaincode response %d, %s", response.Response.Status, response.Response.Message), false, false
}
}
// anything else is a success
return true, "", false, false
return codes.OK, "", false, false
}

// Submit will send the signed transaction to the ordering service. The response indicates whether the transaction was
Expand All @@ -384,7 +385,7 @@ func (gs *Server) Submit(ctx context.Context, request *gp.SubmitRequest) (*gp.Su
}
orderers, err := gs.registry.orderers(request.ChannelId)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
return nil, status.Errorf(codes.FailedPrecondition, "%s", err)
}

if len(orderers) == 0 {
Expand All @@ -400,34 +401,38 @@ func (gs *Server) Submit(ctx context.Context, request *gp.SubmitRequest) (*gp.Su
if err == nil {
return &gp.SubmitResponse{}, nil
}

logger.Warnw("Error sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.address, "err", err)
errDetails = append(errDetails, errorDetail(orderer.endpointConfig, err.Error()))

errStatus := toRpcStatus(err)
if errStatus.Code() != codes.Unavailable {
return nil, newRpcError(errStatus.Code(), errStatus.Message(), errDetails...)
}
}

return nil, newRpcError(codes.Aborted, "no orderers could successfully process transaction", errDetails...)
return nil, newRpcError(codes.Unavailable, "no orderers could successfully process transaction", errDetails...)
}

func (gs *Server) broadcast(ctx context.Context, orderer *orderer, txn *common.Envelope) error {
broadcast, err := orderer.client.Broadcast(ctx)
if err != nil {
return fmt.Errorf("failed to create BroadcastClient: %w", err)
return err
}

if err := broadcast.Send(txn); err != nil {
return fmt.Errorf("failed to send transaction to orderer: %w", err)
return err
}

response, err := broadcast.Recv()
if err != nil {
return fmt.Errorf("failed to receive response from orderer: %w", err)
return err
}

if response == nil {
return fmt.Errorf("received nil response from orderer")
if response.GetStatus() != common.Status_SUCCESS {
return status.Errorf(codes.Aborted, "received unsuccessful response from orderer: %s", common.Status_name[int32(response.GetStatus())])
}

if response.Status != common.Status_SUCCESS {
return fmt.Errorf("received unsuccessful response from orderer: %s", common.Status_name[int32(response.Status)])
}
return nil
}

Expand Down Expand Up @@ -458,7 +463,7 @@ func (gs *Server) CommitStatus(ctx context.Context, signedRequest *gp.SignedComm

txStatus, err := gs.commitFinder.TransactionStatus(ctx, request.ChannelId, request.TransactionId)
if err != nil {
return nil, toRpcError(err, codes.FailedPrecondition)
return nil, toRpcError(err, codes.Aborted)
}

response := &gp.CommitStatusResponse{
Expand Down Expand Up @@ -494,7 +499,7 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest

ledger, err := gs.ledgerProvider.Ledger(request.GetChannelId())
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
return status.Error(codes.NotFound, err.Error())
}

startBlock, err := startBlockFromLedgerPosition(ledger, request.GetStartPosition())
Expand All @@ -504,7 +509,7 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest

ledgerIter, err := ledger.GetBlocksIterator(startBlock)
if err != nil {
return status.Error(codes.Unavailable, err.Error())
return status.Error(codes.Aborted, err.Error())
}

eventsIter := event.NewChaincodeEventsIterator(ledgerIter)
Expand All @@ -513,7 +518,7 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest
for {
response, err := eventsIter.Next()
if err != nil {
return status.Error(codes.Unavailable, err.Error())
return status.Error(codes.Aborted, err.Error())
}

var matchingEvents []*peer.ChaincodeEvent
Expand All @@ -531,7 +536,11 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest
response.Events = matchingEvents

if err := stream.Send(response); err != nil {
return err // Likely stream closed by the client
if err == io.EOF {
// Stream closed by the client
return status.Error(codes.Canceled, err.Error())
}
return err
}
}
}
Expand All @@ -548,7 +557,7 @@ func startBlockFromLedgerPosition(ledger ledger.Ledger, position *ab.SeekPositio

ledgerInfo, err := ledger.GetBlockchainInfo()
if err != nil {
return 0, status.Error(codes.Unavailable, err.Error())
return 0, status.Error(codes.Aborted, err.Error())
}

return ledgerInfo.GetHeight(), nil
Expand Down
Loading