From 00910bad821d98b0111e81bc5fc02677618fac5f Mon Sep 17 00:00:00 2001 From: James Taylor Date: Tue, 4 May 2021 17:35:14 +0100 Subject: [PATCH] Handle missing endpoints from discovery For some reason the discovery PeersOfChannel function does not return an endpoint for the local peer, which the gateway was relying on Rather than updating PeersOfChannel to return the endpoint when there is one, to match the Peers function, this change updates how the gateway identifies the local peer Signed-off-by: James Taylor --- .../chaincode/marbles_private/chaincode.go | 30 ++++ integration/gateway/endorsing_orgs_test.go | 151 ++++++++++++++++++ integration/gateway/gateway_suite_test.go | 9 +- integration/gateway/gateway_test.go | 4 +- .../testdata/collections_config_anyorg.json | 18 +++ integration/nwo/standard_networks.go | 41 +++++ internal/pkg/gateway/api_test.go | 14 +- internal/pkg/gateway/endpoint.go | 6 +- internal/pkg/gateway/gateway.go | 6 +- internal/pkg/gateway/registry.go | 16 +- 10 files changed, 280 insertions(+), 15 deletions(-) create mode 100644 integration/gateway/endorsing_orgs_test.go create mode 100644 integration/gateway/testdata/collections_config_anyorg.json diff --git a/integration/chaincode/marbles_private/chaincode.go b/integration/chaincode/marbles_private/chaincode.go index 953213f0ae2..d3d263d27d4 100644 --- a/integration/chaincode/marbles_private/chaincode.go +++ b/integration/chaincode/marbles_private/chaincode.go @@ -70,6 +70,9 @@ func (t *MarblesPrivateChaincode) Invoke(stub shim.ChaincodeStubInterface) pb.Re case "getMarblePrivateDetailsHash": // get private data hash for collectionMarblePrivateDetails return t.getMarblePrivateDetailsHash(stub, args) + case "checkEndorsingOrg": + // check mspid of the current peer + return t.checkEndorsingOrg(stub) default: // error fmt.Println("invoke did not find func: " + function) @@ -494,3 +497,30 @@ func (t *MarblesPrivateChaincode) getMarblesByRange(stub shim.ChaincodeStubInter return shim.Success(buffer.Bytes()) } + +// CheckEndorsingOrg checks that the peer org is present in the given transient data +func (t *MarblesPrivateChaincode) checkEndorsingOrg(stub shim.ChaincodeStubInterface) pb.Response { + transient, err := stub.GetTransient() + if err != nil { + return shim.Error(fmt.Sprintf("failed to get transient data: %v", err)) + } + + peerOrgMSP, err := shim.GetMSPID() + if err != nil { + return shim.Error(fmt.Sprintf("failed getting client's orgID: %v", err)) + } + + var result string + if _, ok := transient[peerOrgMSP]; ok { + result = "Peer mspid OK" + } else { + expectedMSPs := make([]string, 0, len(transient)) + for k := range transient { + expectedMSPs = append(expectedMSPs, k) + } + + result = fmt.Sprintf("Unexpected peer mspid! Expected MSP IDs: %s Actual MSP ID: %s", expectedMSPs, peerOrgMSP) + } + + return shim.Success([]byte(result)) +} diff --git a/integration/gateway/endorsing_orgs_test.go b/integration/gateway/endorsing_orgs_test.go new file mode 100644 index 00000000000..089a1b72777 --- /dev/null +++ b/integration/gateway/endorsing_orgs_test.go @@ -0,0 +1,151 @@ +/* +Copyright IBM Corp All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package gateway + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "syscall" + + docker "github.com/fsouza/go-dockerclient" + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric-protos-go/gateway" + "github.com/hyperledger/fabric-protos-go/peer" + "github.com/hyperledger/fabric/integration/nwo" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/tedsuo/ifrit" +) + +var _ = Describe("GatewayService with endorsing orgs", func() { + var ( + testDir string + network *nwo.Network + orderer *nwo.Orderer + org1Peer0 *nwo.Peer + org2Peer0 *nwo.Peer + org3Peer0 *nwo.Peer + process ifrit.Process + ) + + BeforeEach(func() { + var err error + testDir, err = ioutil.TempDir("", "gateway") + Expect(err).NotTo(HaveOccurred()) + + client, err := docker.NewClientFromEnv() + Expect(err).NotTo(HaveOccurred()) + + config := nwo.ThreeOrgRaft() + network = nwo.New(config, testDir, client, StartPort(), components) + + network.GatewayEnabled = true + + network.GenerateConfigTree() + network.Bootstrap() + + networkRunner := network.NetworkGroupRunner() + process = ifrit.Invoke(networkRunner) + Eventually(process.Ready(), network.EventuallyTimeout).Should(BeClosed()) + + orderer = network.Orderer("orderer") + network.CreateAndJoinChannel(orderer, "testchannel") + network.UpdateChannelAnchors(orderer, "testchannel") + network.VerifyMembership( + network.PeersWithChannel("testchannel"), + "testchannel", + ) + nwo.EnableCapabilities( + network, + "testchannel", + "Application", "V2_0", + orderer, + network.PeersWithChannel("testchannel")..., + ) + + org1Peer0 = network.Peer("Org1", "peer0") + org2Peer0 = network.Peer("Org2", "peer0") + org3Peer0 = network.Peer("Org3", "peer0") + + chaincode := nwo.Chaincode{ + Name: "pvtmarblescc", + Version: "0.0", + Path: components.Build("github.com/hyperledger/fabric/integration/chaincode/marbles_private/cmd"), + Lang: "binary", + PackageFile: filepath.Join(testDir, "pvtmarblescc.tar.gz"), + Policy: `OR ('Org1MSP.member', 'Org2MSP.member', 'Org3MSP.member')`, + SignaturePolicy: `OR ('Org1MSP.member', 'Org2MSP.member', 'Org3MSP.member')`, + Sequence: "1", + InitRequired: false, + Label: "pvtmarblescc_label", + CollectionsConfig: filepath.Join("testdata", "collections_config_anyorg.json"), + } + + nwo.DeployChaincode(network, "testchannel", orderer, chaincode) + }) + + AfterEach(func() { + if process != nil { + process.Signal(syscall.SIGTERM) + Eventually(process.Wait(), network.EventuallyTimeout).Should(Receive()) + } + if network != nil { + network.Cleanup() + } + os.RemoveAll(testDir) + }) + + It("should execute chaincode on a peer in the specified org", func() { + peers := [3]*nwo.Peer{org1Peer0, org2Peer0, org3Peer0} + + for _, p := range peers { + conn := network.PeerClientConn(p) + defer conn.Close() + gatewayClient := gateway.NewGatewayClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), network.EventuallyTimeout) + defer cancel() + + signingIdentity := network.PeerUserSigner(p, "User1") + for _, o := range peers { + mspid := network.Organization(o.Organization).MSPID + + submitCheckEndorsingOrgTransaction(ctx, gatewayClient, signingIdentity, mspid) + } + } + }) +}) + +func submitCheckEndorsingOrgTransaction(ctx context.Context, client gateway.GatewayClient, signingIdentity *nwo.SigningIdentity, mspids ...string) { + transientData := make(map[string][]byte) + for _, m := range mspids { + transientData[m] = []byte(`true`) + } + + proposedTransaction, transactionID := NewProposedTransaction(signingIdentity, "testchannel", "pvtmarblescc", "checkEndorsingOrg", transientData) + + endorseRequest := &gateway.EndorseRequest{ + TransactionId: transactionID, + ChannelId: "testchannel", + ProposedTransaction: proposedTransaction, + EndorsingOrganizations: mspids, + } + + endorseResponse, err := client.Endorse(ctx, endorseRequest) + Expect(err).NotTo(HaveOccurred()) + + result := endorseResponse.GetResult() + expectedPayload := "Peer mspid OK" + Expect(string(result.Payload)).To(Equal(expectedPayload)) + expectedResult := &peer.Response{ + Status: 200, + Message: "", + Payload: []uint8(expectedPayload), + } + Expect(proto.Equal(result, expectedResult)).To(BeTrue(), "Expected\n\t%#v\nto proto.Equal\n\t%#v", result, expectedResult) +} diff --git a/integration/gateway/gateway_suite_test.go b/integration/gateway/gateway_suite_test.go index f08846b5ba2..b13ef3494e5 100644 --- a/integration/gateway/gateway_suite_test.go +++ b/integration/gateway/gateway_suite_test.go @@ -55,15 +55,15 @@ func StartPort() int { return integration.GatewayBasePort.StartPortForNode() } -func NewProposedTransaction(signingIdentity *nwo.SigningIdentity, channelName, chaincodeName, transactionName string, args ...[]byte) (*peer.SignedProposal, string) { - proposal, transactionID := newProposalProto(signingIdentity, channelName, chaincodeName, transactionName, args...) +func NewProposedTransaction(signingIdentity *nwo.SigningIdentity, channelName, chaincodeName, transactionName string, transientData map[string][]byte, args ...[]byte) (*peer.SignedProposal, string) { + proposal, transactionID := newProposalProto(signingIdentity, channelName, chaincodeName, transactionName, transientData, args...) signedProposal, err := protoutil.GetSignedProposal(proposal, signingIdentity) Expect(err).NotTo(HaveOccurred()) return signedProposal, transactionID } -func newProposalProto(signingIdentity *nwo.SigningIdentity, channelName, chaincodeName, transactionName string, args ...[]byte) (*peer.Proposal, string) { +func newProposalProto(signingIdentity *nwo.SigningIdentity, channelName, chaincodeName, transactionName string, transientData map[string][]byte, args ...[]byte) (*peer.Proposal, string) { creator, err := signingIdentity.Serialize() Expect(err).NotTo(HaveOccurred()) @@ -75,11 +75,12 @@ func newProposalProto(signingIdentity *nwo.SigningIdentity, channelName, chainco }, } - result, transactionID, err := protoutil.CreateChaincodeProposal( + result, transactionID, err := protoutil.CreateChaincodeProposalWithTransient( common.HeaderType_ENDORSER_TRANSACTION, channelName, invocationSpec, creator, + transientData, ) Expect(err).NotTo(HaveOccurred()) diff --git a/integration/gateway/gateway_test.go b/integration/gateway/gateway_test.go index 98c0756f931..bff6ab46849 100644 --- a/integration/gateway/gateway_test.go +++ b/integration/gateway/gateway_test.go @@ -114,7 +114,7 @@ var _ = Describe("GatewayService", func() { }) submitTransaction := func(transactionName string, args ...[]byte) (*peer.Response, string) { - proposedTransaction, transactionID := NewProposedTransaction(signingIdentity, "testchannel", "gatewaycc", transactionName, args...) + proposedTransaction, transactionID := NewProposedTransaction(signingIdentity, "testchannel", "gatewaycc", transactionName, nil, args...) endorseRequest := &gateway.EndorseRequest{ TransactionId: transactionID, @@ -165,7 +165,7 @@ var _ = Describe("GatewayService", func() { Describe("Evaluate", func() { It("should respond with the expected result", func() { - proposedTransaction, transactionID := NewProposedTransaction(signingIdentity, "testchannel", "gatewaycc", "respond", []byte("200"), []byte("conga message"), []byte("conga payload")) + proposedTransaction, transactionID := NewProposedTransaction(signingIdentity, "testchannel", "gatewaycc", "respond", nil, []byte("200"), []byte("conga message"), []byte("conga payload")) request := &gateway.EvaluateRequest{ TransactionId: transactionID, diff --git a/integration/gateway/testdata/collections_config_anyorg.json b/integration/gateway/testdata/collections_config_anyorg.json new file mode 100644 index 00000000000..f0dce3a1c1f --- /dev/null +++ b/integration/gateway/testdata/collections_config_anyorg.json @@ -0,0 +1,18 @@ +[ + { + "name": "collectionMarbles", + "policy": "OR('Org1MSP.member', 'Org2MSP.member', 'Org3MSP.member')", + "requiredPeerCount": 1, + "maxPeerCount": 2, + "blockToLive": 1000000, + "memberOnlyRead": false + }, + { + "name": "collectionMarblePrivateDetails", + "policy": "OR('Org1MSP.member', 'Org2MSP.member', 'Org3MSP.member')", + "requiredPeerCount": 1, + "maxPeerCount": 2, + "blockToLive": 1000000, + "memberOnlyRead": false + } +] diff --git a/integration/nwo/standard_networks.go b/integration/nwo/standard_networks.go index 3c834dd05d8..70ab1da1b1a 100644 --- a/integration/nwo/standard_networks.go +++ b/integration/nwo/standard_networks.go @@ -227,6 +227,47 @@ func MinimalRaft() *Config { return config } +func ThreeOrgRaft() *Config { + config := BasicEtcdRaft() + + config.Organizations = append( + config.Organizations, + &Organization{ + Name: "Org3", + MSPID: "Org3MSP", + Domain: "org3.example.com", + Users: 2, + CA: &CA{Hostname: "ca"}, + }, + ) + config.Consortiums[0].Organizations = append( + config.Consortiums[0].Organizations, + "Org3", + ) + config.SystemChannel.Profile = "ThreeOrgsOrdererGenesis" + config.Channels[0].Profile = "ThreeOrgsChannel" + config.Peers = append( + config.Peers, + &Peer{ + Name: "peer0", + Organization: "Org3", + Channels: []*PeerChannel{ + {Name: "testchannel", Anchor: true}, + }, + }, + ) + config.Profiles = []*Profile{{ + Name: "ThreeOrgsOrdererGenesis", + Orderers: []string{"orderer"}, + }, { + Name: "ThreeOrgsChannel", + Consortium: "SampleConsortium", + Organizations: []string{"Org1", "Org2", "Org3"}, + }} + + return config +} + func MultiChannelEtcdRaft() *Config { config := MultiChannelBasicSolo() diff --git a/internal/pkg/gateway/api_test.go b/internal/pkg/gateway/api_test.go index 5e43bc30fe0..a8194c2b2c9 100644 --- a/internal/pkg/gateway/api_test.go +++ b/internal/pkg/gateway/api_test.go @@ -196,6 +196,17 @@ func TestEvaluate(t *testing.T) { }, expectedEndorsers: []string{"localhost:7051"}, }, + { + name: "five endorsers, prefer host peer despite no endpoint", + members: []networkMember{ + {"id1", "", "msp1", 5}, + {"id2", "peer1:8051", "msp1", 5}, + {"id3", "peer2:9051", "msp2", 6}, + {"id4", "peer3:10051", "msp2", 5}, + {"id5", "peer4:11051", "msp3", 6}, + }, + expectedEndorsers: []string{"localhost:7051"}, + }, { name: "evaluate with targetOrganizations, prefer local org despite block height", members: []networkMember{ @@ -974,6 +985,7 @@ func TestNilArgs(t *testing.T) { &mocks.CommitFinder{}, &mocks.Eventer{}, &mocks.ACLChecker{}, + common.PKIidType("id1"), "localhost:7051", "msp1", config.GetOptions(viper.New()), @@ -1080,7 +1092,7 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest { EndorsementTimeout: endorsementTimeout, } - server := newServer(localEndorser, disc, mockFinder, mockEventer, mockPolicy, "localhost:7051", "msp1", options) + server := newServer(localEndorser, disc, mockFinder, mockEventer, mockPolicy, common.PKIidType("id1"), "localhost:7051", "msp1", options) dialer := &mocks.Dialer{} dialer.Returns(nil, nil) diff --git a/internal/pkg/gateway/endpoint.go b/internal/pkg/gateway/endpoint.go index 4f3fdff0f1b..bb5552d9041 100644 --- a/internal/pkg/gateway/endpoint.go +++ b/internal/pkg/gateway/endpoint.go @@ -13,6 +13,7 @@ import ( ab "github.com/hyperledger/fabric-protos-go/orderer" "github.com/hyperledger/fabric-protos-go/peer" + "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/internal/pkg/comm" "google.golang.org/grpc" ) @@ -28,6 +29,7 @@ type orderer struct { } type endpointConfig struct { + pkiid common.PKIidType address string mspid string } @@ -47,7 +49,7 @@ type endpointFactory struct { dialer dialer } -func (ef *endpointFactory) newEndorser(address, mspid string, tlsRootCerts [][]byte) (*endorser, error) { +func (ef *endpointFactory) newEndorser(pkiid common.PKIidType, address, mspid string, tlsRootCerts [][]byte) (*endorser, error) { conn, err := ef.newConnection(address, tlsRootCerts) if err != nil { return nil, err @@ -58,7 +60,7 @@ func (ef *endpointFactory) newEndorser(address, mspid string, tlsRootCerts [][]b } return &endorser{ client: connectEndorser(conn), - endpointConfig: &endpointConfig{address: address, mspid: mspid}, + endpointConfig: &endpointConfig{pkiid: pkiid, address: address, mspid: mspid}, }, nil } diff --git a/internal/pkg/gateway/gateway.go b/internal/pkg/gateway/gateway.go index 53aef5995a5..aee8bf58a9e 100644 --- a/internal/pkg/gateway/gateway.go +++ b/internal/pkg/gateway/gateway.go @@ -11,6 +11,7 @@ import ( peerproto "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/core/peer" + "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/internal/pkg/gateway/commit" "github.com/hyperledger/fabric/internal/pkg/gateway/config" "google.golang.org/grpc" @@ -62,16 +63,17 @@ func CreateServer(localEndorser peerproto.EndorserServer, discovery Discovery, p commit.NewFinder(adapter, notifier), commit.NewEventer(notifier), policy, + peerInstance.GossipService.SelfMembershipInfo().PKIid, peerInstance.GossipService.SelfMembershipInfo().Endpoint, localMSPID, options, ) } -func newServer(localEndorser peerproto.EndorserClient, discovery Discovery, finder CommitFinder, eventer Eventer, policy ACLChecker, localEndpoint, localMSPID string, options config.Options) *Server { +func newServer(localEndorser peerproto.EndorserClient, discovery Discovery, finder CommitFinder, eventer Eventer, policy ACLChecker, localPKIID common.PKIidType, localEndpoint, localMSPID string, options config.Options) *Server { gwServer := &Server{ registry: ®istry{ - localEndorser: &endorser{client: localEndorser, endpointConfig: &endpointConfig{address: localEndpoint, mspid: localMSPID}}, + localEndorser: &endorser{client: localEndorser, endpointConfig: &endpointConfig{pkiid: localPKIID, address: localEndpoint, mspid: localMSPID}}, discovery: discovery, logger: logger, endpointFactory: &endpointFactory{timeout: options.EndorsementTimeout}, diff --git a/internal/pkg/gateway/registry.go b/internal/pkg/gateway/registry.go index a1a675d5993..9e4f7b50529 100644 --- a/internal/pkg/gateway/registry.go +++ b/internal/pkg/gateway/registry.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package gateway import ( + "bytes" "fmt" "sort" "strings" @@ -165,12 +166,19 @@ func (reg *registry) endorsersByOrg(channel string, chaincode string) map[string defer reg.configLock.RUnlock() for _, member := range members { + pkiid := member.PKIid endpoint := member.PreferredEndpoint() + // find the endorser in the registry for this endpoint var endorser *endorser - if endpoint == reg.localEndorser.address { + if bytes.Equal(pkiid, reg.localEndorser.pkiid) { + logger.Debugw("Found local endorser", "pkiid", pkiid) endorser = reg.localEndorser + } else if endpoint == "" { + reg.logger.Warnf("No endpoint for endorser with PKI ID %s", pkiid.String()) + continue } else if e, ok := reg.remoteEndorsers[endpoint]; ok { + logger.Debugw("Found remote endorser", "endpoint", endpoint) endorser = e } else { reg.logger.Warnf("Failed to find endorser at %s", endpoint) @@ -317,13 +325,13 @@ func (reg *registry) registerChannel(channel string) error { } for mspid, infoset := range reg.discovery.IdentityInfo().ByOrg() { for _, info := range infoset { - pkid := info.PKIId.String() - if address, ok := peers[pkid]; ok { + pkiid := info.PKIId + if address, ok := peers[pkiid.String()]; ok { // add the peer to the peer map - except the local peer, which seems to have an empty address if _, ok := reg.remoteEndorsers[address]; !ok && len(address) > 0 { // this peer is new - connect to it and add to the remoteEndorsers registry tlsRootCerts := reg.tlsRootCerts[mspid] - endorser, err := reg.endpointFactory.newEndorser(address, mspid, tlsRootCerts) + endorser, err := reg.endpointFactory.newEndorser(pkiid, address, mspid, tlsRootCerts) if err != nil { return err }