diff --git a/integration/gateway/gateway_suite_test.go b/integration/gateway/gateway_suite_test.go index b13ef3494e5..c589868d6ba 100644 --- a/integration/gateway/gateway_suite_test.go +++ b/integration/gateway/gateway_suite_test.go @@ -10,12 +10,9 @@ import ( "encoding/json" "testing" - "github.com/hyperledger/fabric-protos-go/common" - "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/integration" "github.com/hyperledger/fabric/integration/nwo" - "github.com/hyperledger/fabric/protoutil" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -54,44 +51,3 @@ var _ = SynchronizedAfterSuite(func() { func StartPort() int { return integration.GatewayBasePort.StartPortForNode() } - -func NewProposedTransaction(signingIdentity *nwo.SigningIdentity, channelName, chaincodeName, transactionName string, transientData map[string][]byte, args ...[]byte) (*peer.SignedProposal, string) { - proposal, transactionID := newProposalProto(signingIdentity, channelName, chaincodeName, transactionName, transientData, args...) - signedProposal, err := protoutil.GetSignedProposal(proposal, signingIdentity) - Expect(err).NotTo(HaveOccurred()) - - return signedProposal, transactionID -} - -func newProposalProto(signingIdentity *nwo.SigningIdentity, channelName, chaincodeName, transactionName string, transientData map[string][]byte, args ...[]byte) (*peer.Proposal, string) { - creator, err := signingIdentity.Serialize() - Expect(err).NotTo(HaveOccurred()) - - invocationSpec := &peer.ChaincodeInvocationSpec{ - ChaincodeSpec: &peer.ChaincodeSpec{ - Type: peer.ChaincodeSpec_NODE, - ChaincodeId: &peer.ChaincodeID{Name: chaincodeName}, - Input: &peer.ChaincodeInput{Args: chaincodeArgs(transactionName, args...)}, - }, - } - - result, transactionID, err := protoutil.CreateChaincodeProposalWithTransient( - common.HeaderType_ENDORSER_TRANSACTION, - channelName, - invocationSpec, - creator, - transientData, - ) - Expect(err).NotTo(HaveOccurred()) - - return result, transactionID -} - -func chaincodeArgs(transactionName string, args ...[]byte) [][]byte { - result := make([][]byte, len(args)+1) - - result[0] = []byte(transactionName) - copy(result[1:], args) - - return result -} diff --git a/integration/gateway/gateway_test.go b/integration/gateway/gateway_test.go index 4d8ba290b1f..a48c0548051 100644 --- a/integration/gateway/gateway_test.go +++ b/integration/gateway/gateway_test.go @@ -16,9 +16,11 @@ import ( docker "github.com/fsouza/go-dockerclient" "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-protos-go/gateway" "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/integration/nwo" + "github.com/hyperledger/fabric/protoutil" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/tedsuo/ifrit" @@ -27,6 +29,47 @@ import ( "google.golang.org/grpc/status" ) +func NewProposedTransaction(signingIdentity *nwo.SigningIdentity, channelName, chaincodeName, transactionName string, transientData map[string][]byte, args ...[]byte) (*peer.SignedProposal, string) { + proposal, transactionID := newProposalProto(signingIdentity, channelName, chaincodeName, transactionName, transientData, args...) + signedProposal, err := protoutil.GetSignedProposal(proposal, signingIdentity) + Expect(err).NotTo(HaveOccurred()) + + return signedProposal, transactionID +} + +func newProposalProto(signingIdentity *nwo.SigningIdentity, channelName, chaincodeName, transactionName string, transientData map[string][]byte, args ...[]byte) (*peer.Proposal, string) { + creator, err := signingIdentity.Serialize() + Expect(err).NotTo(HaveOccurred()) + + invocationSpec := &peer.ChaincodeInvocationSpec{ + ChaincodeSpec: &peer.ChaincodeSpec{ + Type: peer.ChaincodeSpec_NODE, + ChaincodeId: &peer.ChaincodeID{Name: chaincodeName}, + Input: &peer.ChaincodeInput{Args: chaincodeArgs(transactionName, args...)}, + }, + } + + result, transactionID, err := protoutil.CreateChaincodeProposalWithTransient( + common.HeaderType_ENDORSER_TRANSACTION, + channelName, + invocationSpec, + creator, + transientData, + ) + Expect(err).NotTo(HaveOccurred()) + + return result, transactionID +} + +func chaincodeArgs(transactionName string, args ...[]byte) [][]byte { + result := make([][]byte, len(args)+1) + + result[0] = []byte(transactionName) + copy(result[1:], args) + + return result +} + var _ = Describe("GatewayService", func() { var ( testDir string diff --git a/internal/pkg/gateway/api.go b/internal/pkg/gateway/api.go index 86ad6ffb294..9942498cfd5 100644 --- a/internal/pkg/gateway/api.go +++ b/internal/pkg/gateway/api.go @@ -16,6 +16,7 @@ import ( gp "github.com/hyperledger/fabric-protos-go/gateway" "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/core/aclmgmt/resources" + "github.com/hyperledger/fabric/internal/pkg/gateway/event" "github.com/hyperledger/fabric/protoutil" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -355,21 +356,46 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest return status.Error(codes.PermissionDenied, err.Error()) } - events, err := gs.eventer.ChaincodeEvents(stream.Context(), request.ChannelId, request.ChaincodeId) + ledger, err := gs.ledgerProvider.Ledger(request.ChannelId) if err != nil { - return status.Error(codes.FailedPrecondition, err.Error()) + return status.Error(codes.InvalidArgument, err.Error()) } - for event := range events { - response := &gp.ChaincodeEventsResponse{ - BlockNumber: event.BlockNumber, - Events: event.Events, + ledgerInfo, err := ledger.GetBlockchainInfo() + if err != nil { + return status.Error(codes.Unavailable, err.Error()) + } + + ledgerIter, err := ledger.GetBlocksIterator(ledgerInfo.Height) + if err != nil { + return status.Error(codes.Unavailable, err.Error()) + } + + eventsIter := event.NewChaincodeEventsIterator(ledgerIter) + defer eventsIter.Close() + + for { + response, err := eventsIter.Next() + if err != nil { + return status.Error(codes.Unavailable, err.Error()) } + + var matchingEvents []*peer.ChaincodeEvent + + for _, event := range response.Events { + if event.GetChaincodeId() == request.ChaincodeId { + matchingEvents = append(matchingEvents, event) + } + } + + if len(matchingEvents) == 0 { + continue + } + + response.Events = matchingEvents + if err := stream.Send(response); err != nil { return err // Likely stream closed by the client } } - - // If stream is still open, this was a server-side error; otherwise client won't see it anyway - return status.Error(codes.Unavailable, "failed to read events") } diff --git a/internal/pkg/gateway/api_test.go b/internal/pkg/gateway/api_test.go index 0bf96f515fd..a61ac0c751f 100644 --- a/internal/pkg/gateway/api_test.go +++ b/internal/pkg/gateway/api_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/timestamp" cp "github.com/hyperledger/fabric-protos-go/common" dp "github.com/hyperledger/fabric-protos-go/discovery" pb "github.com/hyperledger/fabric-protos-go/gateway" @@ -21,6 +22,7 @@ import ( ab "github.com/hyperledger/fabric-protos-go/orderer" "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/common/crypto/tlsgen" + commonledger "github.com/hyperledger/fabric/common/ledger" "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/common" gdiscovery "github.com/hyperledger/fabric/gossip/discovery" @@ -63,11 +65,6 @@ type commitFinder interface { CommitFinder } -//go:generate counterfeiter -o mocks/eventer.go --fake-name Eventer . eventer -type eventer interface { - Eventer -} - //go:generate counterfeiter -o mocks/chaincodeeventsserver.go --fake-name ChaincodeEventsServer github.com/hyperledger/fabric-protos-go/gateway.Gateway_ChaincodeEventsServer //go:generate counterfeiter -o mocks/aclchecker.go --fake-name ACLChecker . aclChecker @@ -75,6 +72,21 @@ type aclChecker interface { ACLChecker } +//go:generate counterfeiter -o mocks/ledgerprovider.go --fake-name LedgerProvider . ledgerProvider +type ledgerProvider interface { + LedgerProvider +} + +//go:generate counterfeiter -o mocks/ledger.go --fake-name Ledger . mockLedger +type mockLedger interface { + commonledger.Ledger +} + +//go:generate counterfeiter -o mocks/resultsiterator.go --fake-name ResultsIterator . resultsIterator +type mockResultsIterator interface { + commonledger.ResultsIterator +} + type ( endorsementPlan map[string][]endorserState endorsementLayout map[string]uint32 @@ -124,16 +136,17 @@ type testDef struct { endpointDefinition *endpointDef endorsingOrgs []string postSetup func(t *testing.T, def *preparedTest) + postTest func(t *testing.T, def *preparedTest) expectedEndorsers []string finderStatus *commit.Status finderErr error - chaincodeEvents []*commit.BlockChaincodeEvents eventErr error policyErr error expectedResponse proto.Message expectedResponses []proto.Message transientData map[string][]byte interest *peer.ChaincodeInterest + blocks []*cp.Block } type preparedTest struct { @@ -144,9 +157,11 @@ type preparedTest struct { discovery *mocks.Discovery dialer *mocks.Dialer finder *mocks.CommitFinder - eventer *mocks.Eventer eventsServer *mocks.ChaincodeEventsServer policy *mocks.ACLChecker + ledgerProvider *mocks.LedgerProvider + ledger *mocks.Ledger + blockIterator *mocks.ResultsIterator } type contextKey string @@ -962,76 +977,234 @@ func TestCommitStatus(t *testing.T) { } func TestChaincodeEvents(t *testing.T) { - closedEventsChannel := make(chan *commit.BlockChaincodeEvents) - close(closedEventsChannel) + now := time.Now() + transactionId := "TRANSACTION_ID" + + matchChaincodeEvent := &peer.ChaincodeEvent{ + ChaincodeId: testChaincode, + TxId: transactionId, + EventName: "EVENT_NAME", + Payload: []byte("PAYLOAD"), + } + + mismatchChaincodeEvent := &peer.ChaincodeEvent{ + ChaincodeId: "WRONG_CHAINCODE_ID", + TxId: transactionId, + EventName: "EVENT_NAME", + Payload: []byte("PAYLOAD"), + } + + txHeader := &cp.Header{ + ChannelHeader: protoutil.MarshalOrPanic(&cp.ChannelHeader{ + Type: int32(cp.HeaderType_ENDORSER_TRANSACTION), + Timestamp: ×tamp.Timestamp{ + Seconds: now.Unix(), + Nanos: int32(now.Nanosecond()), + }, + TxId: transactionId, + }), + } + + matchTxEnvelope := &cp.Envelope{ + Payload: protoutil.MarshalOrPanic(&cp.Payload{ + Header: txHeader, + Data: protoutil.MarshalOrPanic(&peer.Transaction{ + Actions: []*peer.TransactionAction{ + { + Payload: protoutil.MarshalOrPanic(&peer.ChaincodeActionPayload{ + Action: &peer.ChaincodeEndorsedAction{ + ProposalResponsePayload: protoutil.MarshalOrPanic(&peer.ProposalResponsePayload{ + Extension: protoutil.MarshalOrPanic(&peer.ChaincodeAction{ + Events: protoutil.MarshalOrPanic(matchChaincodeEvent), + }), + }), + }, + }), + }, + }, + }), + }), + } + + mismatchTxEnvelope := &cp.Envelope{ + Payload: protoutil.MarshalOrPanic(&cp.Payload{ + Header: txHeader, + Data: protoutil.MarshalOrPanic(&peer.Transaction{ + Actions: []*peer.TransactionAction{ + { + Payload: protoutil.MarshalOrPanic(&peer.ChaincodeActionPayload{ + Action: &peer.ChaincodeEndorsedAction{ + ProposalResponsePayload: protoutil.MarshalOrPanic(&peer.ProposalResponsePayload{ + Extension: protoutil.MarshalOrPanic(&peer.ChaincodeAction{ + Events: protoutil.MarshalOrPanic(mismatchChaincodeEvent), + }), + }), + }, + }), + }, + }, + }), + }), + } + + block100Proto := &cp.Block{ + Header: &cp.BlockHeader{ + Number: 100, + }, + Metadata: &cp.BlockMetadata{ + Metadata: [][]byte{ + nil, + nil, + { + byte(peer.TxValidationCode_VALID), + }, + nil, + nil, + }, + }, + Data: &cp.BlockData{ + Data: [][]byte{ + protoutil.MarshalOrPanic(mismatchTxEnvelope), + }, + }, + } + + block101Proto := &cp.Block{ + Header: &cp.BlockHeader{ + Number: 101, + }, + Metadata: &cp.BlockMetadata{ + Metadata: [][]byte{ + nil, + nil, + { + byte(peer.TxValidationCode_VALID), + byte(peer.TxValidationCode_VALID), + byte(peer.TxValidationCode_VALID), + }, + nil, + nil, + }, + }, + Data: &cp.BlockData{ + Data: [][]byte{ + protoutil.MarshalOrPanic(&cp.Envelope{ + Payload: protoutil.MarshalOrPanic(&cp.Payload{ + Header: &cp.Header{ + ChannelHeader: protoutil.MarshalOrPanic(&cp.ChannelHeader{ + Type: int32(cp.HeaderType_CONFIG_UPDATE), + }), + }, + }), + }), + protoutil.MarshalOrPanic(mismatchTxEnvelope), + protoutil.MarshalOrPanic(matchTxEnvelope), + }, + }, + } tests := []testDef{ { - name: "error establishing event reading", + name: "error reading events", eventErr: errors.New("EVENT_ERROR"), - errString: "rpc error: code = FailedPrecondition desc = EVENT_ERROR", + errString: "rpc error: code = Unavailable desc = EVENT_ERROR", }, { name: "returns chaincode events", - chaincodeEvents: []*commit.BlockChaincodeEvents{ - { - BlockNumber: 101, + blocks: []*cp.Block{ + block101Proto, + }, + expectedResponses: []proto.Message{ + &pb.ChaincodeEventsResponse{ + BlockNumber: block101Proto.GetHeader().GetNumber(), Events: []*peer.ChaincodeEvent{ { ChaincodeId: testChaincode, - TxId: "TX_ID", - EventName: "EVENT_NAME", - Payload: []byte("PAYLOAD"), + TxId: matchChaincodeEvent.GetTxId(), + EventName: matchChaincodeEvent.GetEventName(), + Payload: matchChaincodeEvent.GetPayload(), }, }, }, }, + }, + { + name: "skips blocks containing only non-matching chaincode events", + blocks: []*cp.Block{ + block100Proto, + block101Proto, + }, expectedResponses: []proto.Message{ &pb.ChaincodeEventsResponse{ - BlockNumber: 101, + BlockNumber: block101Proto.GetHeader().GetNumber(), Events: []*peer.ChaincodeEvent{ { ChaincodeId: testChaincode, - TxId: "TX_ID", - EventName: "EVENT_NAME", - Payload: []byte("PAYLOAD"), + TxId: matchChaincodeEvent.GetTxId(), + EventName: matchChaincodeEvent.GetEventName(), + Payload: matchChaincodeEvent.GetPayload(), }, }, }, }, }, { - name: "passes channel name to eventer", + name: "passes channel name to ledger provider", + postTest: func(t *testing.T, test *preparedTest) { + require.Equal(t, 1, test.ledgerProvider.LedgerCallCount()) + require.Equal(t, testChannel, test.ledgerProvider.LedgerArgsForCall(0)) + }, + }, + { + name: "returns error obtaining ledger", + blocks: []*cp.Block{ + block101Proto, + }, + errString: "rpc error: code = InvalidArgument desc = LEDGER_PROVIDER_ERROR", + postSetup: func(t *testing.T, test *preparedTest) { + test.ledgerProvider.LedgerReturns(nil, errors.New("LEDGER_PROVIDER_ERROR")) + }, + }, + { + name: "returns error obtaining ledger height", + blocks: []*cp.Block{ + block101Proto, + }, + errString: "rpc error: code = Unavailable desc = LEDGER_INFO_ERROR", postSetup: func(t *testing.T, test *preparedTest) { - test.eventer.ChaincodeEventsCalls(func(ctx context.Context, channelName string, chaincodeName string) (<-chan *commit.BlockChaincodeEvents, error) { - require.Equal(t, testChannel, channelName) - return closedEventsChannel, nil - }) + test.ledger.GetBlockchainInfoReturns(nil, errors.New("LEDGER_INFO_ERROR")) }, }, { - name: "passes chaincode ID to eventer", + name: "uses block height as default start block", + blocks: []*cp.Block{ + block101Proto, + }, postSetup: func(t *testing.T, test *preparedTest) { - test.eventer.ChaincodeEventsCalls(func(ctx context.Context, channelName string, chaincodeName string) (<-chan *commit.BlockChaincodeEvents, error) { - require.Equal(t, testChaincode, chaincodeName) - return closedEventsChannel, nil - }) + ledgerInfo := &cp.BlockchainInfo{ + Height: 101, + } + test.ledger.GetBlockchainInfoReturns(ledgerInfo, nil) + }, + postTest: func(t *testing.T, test *preparedTest) { + require.Equal(t, 1, test.ledger.GetBlocksIteratorCallCount()) + require.EqualValues(t, 101, test.ledger.GetBlocksIteratorArgsForCall(0)) + }, + }, + { + name: "returns error obtaining ledger iterator", + blocks: []*cp.Block{ + block101Proto, + }, + errString: "rpc error: code = Unavailable desc = LEDGER_ITERATOR_ERROR", + postSetup: func(t *testing.T, test *preparedTest) { + test.ledger.GetBlocksIteratorReturns(nil, errors.New("LEDGER_ITERATOR_ERROR")) }, }, { name: "returns error from send to client", - chaincodeEvents: []*commit.BlockChaincodeEvents{ - { - BlockNumber: 101, - Events: []*peer.ChaincodeEvent{ - { - ChaincodeId: testChaincode, - TxId: "TX_ID", - EventName: "EVENT_NAME", - Payload: []byte("PAYLOAD"), - }, - }, - }, + blocks: []*cp.Block{ + block101Proto, }, errString: "rpc error: code = Aborted desc = SEND_ERROR", postSetup: func(t *testing.T, test *preparedTest) { @@ -1045,29 +1218,23 @@ func TestChaincodeEvents(t *testing.T) { }, { name: "passes channel name to policy checker", - postSetup: func(t *testing.T, test *preparedTest) { - test.policy.CheckACLCalls(func(policyName string, channelName string, data interface{}) error { - require.Equal(t, testChannel, channelName) - return nil - }) + postTest: func(t *testing.T, test *preparedTest) { + require.Equal(t, 1, test.policy.CheckACLCallCount()) + _, channelName, _ := test.policy.CheckACLArgsForCall(0) + require.Equal(t, testChannel, channelName) }, }, { name: "passes identity to policy checker", identity: []byte("IDENTITY"), - postSetup: func(t *testing.T, test *preparedTest) { - test.policy.CheckACLCalls(func(policyName string, channelName string, data interface{}) error { - require.IsType(t, &protoutil.SignedData{}, data) - signedData := data.(*protoutil.SignedData) - require.Equal(t, []byte("IDENTITY"), signedData.Identity) - return nil - }) + postTest: func(t *testing.T, test *preparedTest) { + require.Equal(t, 1, test.policy.CheckACLCallCount()) + _, _, data := test.policy.CheckACLArgsForCall(0) + require.IsType(t, &protoutil.SignedData{}, data) + signedData := data.(*protoutil.SignedData) + require.Equal(t, []byte("IDENTITY"), signedData.Identity) }, }, - { - name: "error when no more events can be read", - errString: "rpc error: code = Unavailable desc = failed to read events", - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -1095,7 +1262,11 @@ func TestChaincodeEvents(t *testing.T) { for i, expectedResponse := range tt.expectedResponses { actualResponse := test.eventsServer.SendArgsForCall(i) - require.True(t, proto.Equal(expectedResponse, actualResponse)) + require.True(t, proto.Equal(expectedResponse, actualResponse), "response[%d] mismatch: %v", i, actualResponse) + } + + if tt.postTest != nil { + tt.postTest(t, test) } }) } @@ -1106,8 +1277,8 @@ func TestNilArgs(t *testing.T) { &mocks.EndorserClient{}, &mocks.Discovery{}, &mocks.CommitFinder{}, - &mocks.Eventer{}, &mocks.ACLChecker{}, + &mocks.LedgerProvider{}, common.PKIidType("id1"), "localhost:7051", "msp1", @@ -1164,17 +1335,38 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest { mockFinder := &mocks.CommitFinder{} mockFinder.TransactionStatusReturns(tt.finderStatus, tt.finderErr) - eventChannel := make(chan *commit.BlockChaincodeEvents, len(tt.chaincodeEvents)) - for _, event := range tt.chaincodeEvents { - eventChannel <- event - } - close(eventChannel) - mockEventer := &mocks.Eventer{} - mockEventer.ChaincodeEventsReturns(eventChannel, tt.eventErr) - mockPolicy := &mocks.ACLChecker{} mockPolicy.CheckACLReturns(tt.policyErr) + mockBlockIterator := &mocks.ResultsIterator{} + blockChannel := make(chan *cp.Block, len(tt.blocks)) + for _, block := range tt.blocks { + blockChannel <- block + } + close(blockChannel) + mockBlockIterator.NextCalls(func() (commonledger.QueryResult, error) { + if tt.eventErr != nil { + return nil, tt.eventErr + } + + block := <-blockChannel + if block == nil { + return nil, errors.New("NO_MORE_BLOCKS") + } + + return block, nil + }) + + mockLedger := &mocks.Ledger{} + ledgerInfo := &cp.BlockchainInfo{ + Height: 1, + } + mockLedger.GetBlockchainInfoReturns(ledgerInfo, nil) + mockLedger.GetBlocksIteratorReturns(mockBlockIterator, nil) + + mockLedgerProvider := &mocks.LedgerProvider{} + mockLedgerProvider.LedgerReturns(mockLedger, nil) + validProposal := createProposal(t, testChannel, testChaincode, tt.transientData) validSignedProposal, err := protoutil.GetSignedProposal(validProposal, mockSigner) require.NoError(t, err) @@ -1215,7 +1407,7 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest { EndorsementTimeout: endorsementTimeout, } - server := newServer(localEndorser, disc, mockFinder, mockEventer, mockPolicy, common.PKIidType("id1"), "localhost:7051", "msp1", options) + server := newServer(localEndorser, disc, mockFinder, mockPolicy, mockLedgerProvider, common.PKIidType("id1"), "localhost:7051", "msp1", options) dialer := &mocks.Dialer{} dialer.Returns(nil, nil) @@ -1232,9 +1424,11 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest { discovery: disc, dialer: dialer, finder: mockFinder, - eventer: mockEventer, eventsServer: &mocks.ChaincodeEventsServer{}, policy: mockPolicy, + ledgerProvider: mockLedgerProvider, + ledger: mockLedger, + blockIterator: mockBlockIterator, } if tt.postSetup != nil { tt.postSetup(t, pt) diff --git a/internal/pkg/gateway/commit/chaincodeeventnotifier.go b/internal/pkg/gateway/commit/chaincodeeventnotifier.go deleted file mode 100644 index 1b3800aa208..00000000000 --- a/internal/pkg/gateway/commit/chaincodeeventnotifier.go +++ /dev/null @@ -1,166 +0,0 @@ -/* -Copyright IBM Corp. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package commit - -import ( - "sync" - - "github.com/golang/protobuf/proto" - "github.com/hyperledger/fabric-protos-go/peer" - "github.com/hyperledger/fabric/core/ledger" -) - -type BlockChaincodeEvents struct { - BlockNumber uint64 - Events []*peer.ChaincodeEvent -} - -type chaincodeEventListenerSet map[*chaincodeEventListener]struct{} - -type chaincodeEventNotifier struct { - lock sync.Mutex - listenersByChaincodeName map[string]chaincodeEventListenerSet - closed bool -} - -func newChaincodeEventNotifier() *chaincodeEventNotifier { - return &chaincodeEventNotifier{ - listenersByChaincodeName: make(map[string]chaincodeEventListenerSet), - } -} - -func (notifier *chaincodeEventNotifier) ReceiveBlock(blockEvent *ledger.CommitNotification) { - notifier.removeCompletedListeners() - - chaincodeEvents := getEventsByChaincodeName(blockEvent) - notifier.notify(chaincodeEvents) -} - -func (notifier *chaincodeEventNotifier) removeCompletedListeners() { - notifier.lock.Lock() - defer notifier.lock.Unlock() - - for chaincodeName, listeners := range notifier.listenersByChaincodeName { - for listener := range listeners { - if listener.isDone() { - notifier.removeListener(chaincodeName, listener) - } - } - } -} - -func (notifier *chaincodeEventNotifier) removeListener(chaincodeName string, listener *chaincodeEventListener) { - listener.close() - - listeners := notifier.listenersByChaincodeName[chaincodeName] - delete(listeners, listener) - - if len(listeners) == 0 { - delete(notifier.listenersByChaincodeName, chaincodeName) - } -} - -func getEventsByChaincodeName(blockEvent *ledger.CommitNotification) map[string]*BlockChaincodeEvents { - results := make(map[string]*BlockChaincodeEvents) - - for _, txInfo := range blockEvent.TxsInfo { - if txInfo.ChaincodeEventData != nil { - event := &peer.ChaincodeEvent{} - if err := proto.Unmarshal(txInfo.ChaincodeEventData, event); err != nil { - continue - } - - events := results[event.ChaincodeId] - if events == nil { - events = &BlockChaincodeEvents{ - BlockNumber: blockEvent.BlockNumber, - } - results[event.ChaincodeId] = events - } - - events.Events = append(events.Events, event) - } - } - - return results -} - -func (notifier *chaincodeEventNotifier) notify(eventsByChaincodeName map[string]*BlockChaincodeEvents) { - notifier.lock.Lock() - defer notifier.lock.Unlock() - - for chaincodeName, events := range eventsByChaincodeName { - for listener := range notifier.listenersByChaincodeName[chaincodeName] { - listener.receive(events) - } - } -} - -func (notifier *chaincodeEventNotifier) registerListener(done <-chan struct{}, chaincodeName string) <-chan *BlockChaincodeEvents { - notifyChannel := make(chan *BlockChaincodeEvents, 100) // Avoid blocking by buffering a number of blocks - - notifier.lock.Lock() - defer notifier.lock.Unlock() - - if notifier.closed { - close(notifyChannel) - } else { - listener := &chaincodeEventListener{ - done: done, - notifyChannel: notifyChannel, - } - notifier.listenersForChaincodeName(chaincodeName)[listener] = struct{}{} - } - - return notifyChannel -} - -func (notifier *chaincodeEventNotifier) listenersForChaincodeName(chaincodeName string) chaincodeEventListenerSet { - listeners, exists := notifier.listenersByChaincodeName[chaincodeName] - if !exists { - listeners = make(chaincodeEventListenerSet) - notifier.listenersByChaincodeName[chaincodeName] = listeners - } - - return listeners -} - -func (notifier *chaincodeEventNotifier) Close() { - notifier.lock.Lock() - defer notifier.lock.Unlock() - - for _, listeners := range notifier.listenersByChaincodeName { - for listener := range listeners { - listener.close() - } - } - - notifier.listenersByChaincodeName = nil - notifier.closed = true -} - -type chaincodeEventListener struct { - done <-chan struct{} - notifyChannel chan<- *BlockChaincodeEvents -} - -func (listener *chaincodeEventListener) isDone() bool { - select { - case <-listener.done: - return true - default: - return false - } -} - -func (listener *chaincodeEventListener) close() { - close(listener.notifyChannel) -} - -func (listener *chaincodeEventListener) receive(events *BlockChaincodeEvents) { - listener.notifyChannel <- events -} diff --git a/internal/pkg/gateway/commit/eventer.go b/internal/pkg/gateway/commit/eventer.go deleted file mode 100644 index 8f484d8be7f..00000000000 --- a/internal/pkg/gateway/commit/eventer.go +++ /dev/null @@ -1,23 +0,0 @@ -/* -Copyright IBM Corp. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package commit - -import "context" - -type Eventer struct { - notifier *Notifier -} - -func NewEventer(notifier *Notifier) *Eventer { - return &Eventer{ - notifier: notifier, - } -} - -func (e *Eventer) ChaincodeEvents(ctx context.Context, channelName string, chaincodeName string) (<-chan *BlockChaincodeEvents, error) { - return e.notifier.notifyChaincodeEvents(ctx.Done(), channelName, chaincodeName) -} diff --git a/internal/pkg/gateway/commit/eventer_test.go b/internal/pkg/gateway/commit/eventer_test.go deleted file mode 100644 index b9a14a5ba37..00000000000 --- a/internal/pkg/gateway/commit/eventer_test.go +++ /dev/null @@ -1,259 +0,0 @@ -/* -Copyright IBM Corp. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package commit - -import ( - "context" - "testing" - - "github.com/hyperledger/fabric-protos-go/peer" - "github.com/hyperledger/fabric/core/ledger" - "github.com/hyperledger/fabric/internal/pkg/gateway/commit/mocks" - "github.com/pkg/errors" - "github.com/stretchr/testify/require" -) - -func TestEventer(t *testing.T) { - t.Run("realtime events", func(t *testing.T) { - t.Run("returns error from notification supplier", func(t *testing.T) { - supplier := &mocks.NotificationSupplier{} - supplier.CommitNotificationsReturns(nil, errors.New("MY_ERROR")) - notifier := NewNotifier(supplier) - defer notifier.close() - eventer := NewEventer(notifier) - - _, err := eventer.ChaincodeEvents(context.Background(), "CHANNEL_NAME", "CHAINCODE_NAME") - - require.ErrorContains(t, err, "MY_ERROR") - }) - - t.Run("delivers events for matching chaincode", func(t *testing.T) { - commitSend := make(chan *ledger.CommitNotification, 1) - notifier := newTestNotifier(commitSend) - defer notifier.close() - eventer := NewEventer(notifier) - - eventReceive, err := eventer.ChaincodeEvents(context.Background(), "CHANNEL_NAME", "CHAINCODE_NAME") - require.NoError(t, err) - - chaincodeEvent := newTestChaincodeEvent("CHAINCODE_NAME") - commitSend <- &ledger.CommitNotification{ - BlockNumber: 1, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, chaincodeEvent), - }, - }, - } - actual := <-eventReceive - - expectedEvents := []*peer.ChaincodeEvent{ - chaincodeEvent, - } - require.Equal(t, uint64(1), actual.BlockNumber, "block number") - assertEqualChaincodeEvents(t, expectedEvents, actual.Events) - }) - - t.Run("ignores events for non-matching chaincode", func(t *testing.T) { - commitSend := make(chan *ledger.CommitNotification, 1) - notifier := newTestNotifier(commitSend) - defer notifier.close() - eventer := NewEventer(notifier) - - eventReceive, err := eventer.ChaincodeEvents(context.Background(), "CHANNEL_NAME", "CHAINCODE_NAME") - require.NoError(t, err) - - chaincodeEvent := newTestChaincodeEvent("CHAINCODE_NAME") - commitSend <- &ledger.CommitNotification{ - BlockNumber: 1, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, chaincodeEvent), - }, - { - ChaincodeEventData: assertMarshallProto(t, newTestChaincodeEvent("WRONG")), - }, - { - ChaincodeEventData: assertMarshallProto(t, chaincodeEvent), - }, - }, - } - actual := <-eventReceive - - expectedEvents := []*peer.ChaincodeEvent{ - chaincodeEvent, - chaincodeEvent, - } - require.Equal(t, uint64(1), actual.BlockNumber, "block number") - assertEqualChaincodeEvents(t, expectedEvents, actual.Events) - }) - - t.Run("delivers events for multiple blocks", func(t *testing.T) { - commitSend := make(chan *ledger.CommitNotification, 1) - notifier := newTestNotifier(commitSend) - defer notifier.close() - eventer := NewEventer(notifier) - - eventReceive, err := eventer.ChaincodeEvents(context.Background(), "CHANNEL_NAME", "CHAINCODE_NAME") - require.NoError(t, err) - - chaincodeEvent := newTestChaincodeEvent("CHAINCODE_NAME") - commitSend <- &ledger.CommitNotification{ - BlockNumber: 1, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, chaincodeEvent), - }, - }, - } - - expectedEvents := []*peer.ChaincodeEvent{ - chaincodeEvent, - } - - actual1 := <-eventReceive - require.Equal(t, uint64(1), actual1.BlockNumber, "block number") - assertEqualChaincodeEvents(t, expectedEvents, actual1.Events) - - commitSend <- &ledger.CommitNotification{ - BlockNumber: 2, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, chaincodeEvent), - }, - }, - } - actual2 := <-eventReceive - - require.Equal(t, uint64(2), actual2.BlockNumber, "block number") - assertEqualChaincodeEvents(t, expectedEvents, actual2.Events) - }) - - t.Run("ignores blocks with no events", func(t *testing.T) { - commitSend := make(chan *ledger.CommitNotification, 2) - notifier := newTestNotifier(commitSend) - defer notifier.close() - eventer := NewEventer(notifier) - - eventReceive, err := eventer.ChaincodeEvents(context.Background(), "CHANNEL_NAME", "CHAINCODE_NAME") - require.NoError(t, err) - - chaincodeEvent := newTestChaincodeEvent("CHAINCODE_NAME") - commitSend <- &ledger.CommitNotification{ - BlockNumber: 1, - } - commitSend <- &ledger.CommitNotification{ - BlockNumber: 2, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, chaincodeEvent), - }, - }, - } - - actual := <-eventReceive - - expectedEvents := []*peer.ChaincodeEvent{ - chaincodeEvent, - } - require.Equal(t, uint64(2), actual.BlockNumber, "block number") - assertEqualChaincodeEvents(t, expectedEvents, actual.Events) - }) - - t.Run("ignores blocks with no events matching chaincode name", func(t *testing.T) { - commitSend := make(chan *ledger.CommitNotification, 2) - notifier := newTestNotifier(commitSend) - defer notifier.close() - eventer := NewEventer(notifier) - - eventReceive, err := eventer.ChaincodeEvents(context.Background(), "CHANNEL_NAME", "CHAINCODE_NAME") - require.NoError(t, err) - - chaincodeEvent := newTestChaincodeEvent("CHAINCODE_NAME") - commitSend <- &ledger.CommitNotification{ - BlockNumber: 1, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, newTestChaincodeEvent("WRONG")), - }, - }, - } - commitSend <- &ledger.CommitNotification{ - BlockNumber: 2, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, chaincodeEvent), - }, - }, - } - - actual := <-eventReceive - - expectedEvents := []*peer.ChaincodeEvent{ - chaincodeEvent, - } - require.Equal(t, uint64(2), actual.BlockNumber, "block number") - assertEqualChaincodeEvents(t, expectedEvents, actual.Events) - }) - - t.Run("delivers events to multiple listeners", func(t *testing.T) { - commitSend := make(chan *ledger.CommitNotification, 1) - notifier := newTestNotifier(commitSend) - defer notifier.close() - eventer := NewEventer(notifier) - - eventReceive1, err := eventer.ChaincodeEvents(context.Background(), "CHANNEL_NAME", "CHAINCODE_NAME") - require.NoError(t, err) - eventReceive2, err := eventer.ChaincodeEvents(context.Background(), "CHANNEL_NAME", "CHAINCODE_NAME") - require.NoError(t, err) - - chaincodeEvent := newTestChaincodeEvent("CHAINCODE_NAME") - commitSend <- &ledger.CommitNotification{ - BlockNumber: 1, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, chaincodeEvent), - }, - }, - } - actual1 := <-eventReceive1 - actual2 := <-eventReceive2 - - expectedEvents := []*peer.ChaincodeEvent{ - chaincodeEvent, - } - require.Equal(t, uint64(1), actual1.BlockNumber, "block number") - assertEqualChaincodeEvents(t, expectedEvents, actual1.Events) - require.Equal(t, uint64(1), actual2.BlockNumber, "block number") - assertEqualChaincodeEvents(t, expectedEvents, actual2.Events) - }) - - t.Run("stops listening when done channel is closed", func(t *testing.T) { - commitSend := make(chan *ledger.CommitNotification, 1) - notifier := newTestNotifier(commitSend) - defer notifier.close() - eventer := NewEventer(notifier) - - ctx, cancel := context.WithCancel(context.Background()) - eventReceive, err := eventer.ChaincodeEvents(ctx, "CHANNEL_NAME", "CHAINCODE_NAME") - require.NoError(t, err) - - cancel() - commitSend <- &ledger.CommitNotification{ - BlockNumber: 1, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, newTestChaincodeEvent("CHAINCODE_NAME")), - }, - }, - } - _, ok := <-eventReceive - - require.False(t, ok, "Expected notification channel to be closed but receive was successful") - }) - }) -} diff --git a/internal/pkg/gateway/commit/notifier.go b/internal/pkg/gateway/commit/notifier.go index dbcb01a1734..7d9e3280c52 100644 --- a/internal/pkg/gateway/commit/notifier.go +++ b/internal/pkg/gateway/commit/notifier.go @@ -19,9 +19,8 @@ type NotificationSupplier interface { } type notifiers struct { - block *blockNotifier - status *statusNotifier - chaincodeEvents *chaincodeEventNotifier + block *blockNotifier + status *statusNotifier } // Notifier provides notification of transaction commits. @@ -53,16 +52,6 @@ func (n *Notifier) notifyStatus(done <-chan struct{}, channelName string, transa return notifyChannel, nil } -func (n *Notifier) notifyChaincodeEvents(done <-chan struct{}, channelName string, chaincodeName string) (<-chan *BlockChaincodeEvents, error) { - notifiers, err := n.notifiersForChannel(channelName) - if err != nil { - return nil, err - } - - notifyChannel := notifiers.chaincodeEvents.registerListener(done, chaincodeName) - return notifyChannel, nil -} - // close the notifier. This closes all notification channels obtained from this notifier. Behavior is undefined after // closing and the notifier should not be used. func (n *Notifier) close() { @@ -86,12 +75,10 @@ func (n *Notifier) notifiersForChannel(channelName string) (*notifiers, error) { } statusNotifier := newStatusNotifier() - chaincodeEventNotifier := newChaincodeEventNotifier() - blockNotifier := newBlockNotifier(n.cancel, commitChannel, statusNotifier, chaincodeEventNotifier) + blockNotifier := newBlockNotifier(n.cancel, commitChannel, statusNotifier) result = ¬ifiers{ - block: blockNotifier, - status: statusNotifier, - chaincodeEvents: chaincodeEventNotifier, + block: blockNotifier, + status: statusNotifier, } n.notifiersByChannel[channelName] = result diff --git a/internal/pkg/gateway/event/block.go b/internal/pkg/gateway/event/block.go new file mode 100644 index 00000000000..e65c5c2fb8d --- /dev/null +++ b/internal/pkg/gateway/event/block.go @@ -0,0 +1,100 @@ +/* +Copyright IBM Corp. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package event + +import ( + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric-protos-go/peer" +) + +type Block struct { + block *common.Block + transactions []*Transaction +} + +func NewBlock(block *common.Block) *Block { + return &Block{ + block: block, + } +} + +func (b *Block) Number() uint64 { + return b.block.GetHeader().GetNumber() +} + +func (b *Block) Transactions() ([]*Transaction, error) { + var err error + + if b.transactions == nil { + b.transactions, err = b.readTransactions() + } + + return b.transactions, err +} + +func (b *Block) readTransactions() ([]*Transaction, error) { + transactions := make([]*Transaction, 0) + + txPayloads, err := b.payloads() + if err != nil { + return nil, err + } + + for i, payload := range txPayloads { + header := &common.ChannelHeader{} + if err := proto.Unmarshal(payload.GetHeader().GetChannelHeader(), header); err != nil { + return nil, err + } + + if header.GetType() == int32(common.HeaderType_ENDORSER_TRANSACTION) { + transaction := &Transaction{ + parent: b, + payload: payload, + id: header.GetTxId(), + timestamp: header.GetTimestamp(), + status: b.statusCode(i), + } + transactions = append(transactions, transaction) + } + } + + return transactions, nil +} + +func (b *Block) payloads() ([]*common.Payload, error) { + var payloads []*common.Payload + + for _, envelopeBytes := range b.block.GetData().GetData() { + envelope := &common.Envelope{} + if err := proto.Unmarshal(envelopeBytes, envelope); err != nil { + return nil, err + } + + payload := &common.Payload{} + if err := proto.Unmarshal(envelope.Payload, payload); err != nil { + return nil, err + } + + payloads = append(payloads, payload) + } + + return payloads, nil +} + +func (b *Block) statusCode(txIndex int) peer.TxValidationCode { + metadata := b.block.GetMetadata().GetMetadata() + if int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) >= len(metadata) { + return peer.TxValidationCode_INVALID_OTHER_REASON + } + + statusCodes := metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] + if txIndex >= len(statusCodes) { + return peer.TxValidationCode_INVALID_OTHER_REASON + } + + return peer.TxValidationCode(statusCodes[txIndex]) +} diff --git a/internal/pkg/gateway/event/blockiterator.go b/internal/pkg/gateway/event/blockiterator.go new file mode 100644 index 00000000000..56c372c2b02 --- /dev/null +++ b/internal/pkg/gateway/event/blockiterator.go @@ -0,0 +1,40 @@ +/* +Copyright IBM Corp. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package event + +import ( + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/common/ledger" + "github.com/pkg/errors" +) + +type BlockIterator struct { + ledgerIter ledger.ResultsIterator +} + +func NewBlockIterator(iterator ledger.ResultsIterator) *BlockIterator { + return &BlockIterator{ + ledgerIter: iterator, + } +} + +func (iter *BlockIterator) Next() (*Block, error) { + result, err := iter.ledgerIter.Next() + if err != nil { + return nil, err + } + + switch block := result.(type) { + case *common.Block: + return NewBlock(block), nil + default: + return nil, errors.Errorf("unexpected block type: %T", result) + } +} + +func (iter *BlockIterator) Close() { + iter.ledgerIter.Close() +} diff --git a/internal/pkg/gateway/event/chaincode.go b/internal/pkg/gateway/event/chaincode.go new file mode 100644 index 00000000000..264a26de718 --- /dev/null +++ b/internal/pkg/gateway/event/chaincode.go @@ -0,0 +1,35 @@ +/* +Copyright IBM Corp. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package event + +import ( + "github.com/hyperledger/fabric-protos-go/peer" +) + +type ChaincodeEvent struct { + parent *Transaction + message *peer.ChaincodeEvent +} + +func (event *ChaincodeEvent) Transaction() *Transaction { + return event.parent +} + +func (event *ChaincodeEvent) ChaincodeID() string { + return event.message.ChaincodeId +} + +func (event *ChaincodeEvent) EventName() string { + return event.message.EventName +} + +func (event *ChaincodeEvent) Payload() []byte { + return event.message.Payload +} + +func (event *ChaincodeEvent) ProtoMessage() *peer.ChaincodeEvent { + return event.message +} diff --git a/internal/pkg/gateway/event/chaincodeiterator.go b/internal/pkg/gateway/event/chaincodeiterator.go new file mode 100644 index 00000000000..42ff1700876 --- /dev/null +++ b/internal/pkg/gateway/event/chaincodeiterator.go @@ -0,0 +1,83 @@ +/* +Copyright IBM Corp. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package event + +import ( + "github.com/hyperledger/fabric-protos-go/gateway" + "github.com/hyperledger/fabric-protos-go/peer" + "github.com/hyperledger/fabric/common/ledger" +) + +type ChaincodeEventsIterator struct { + blockIter *BlockIterator +} + +func NewChaincodeEventsIterator(iterator ledger.ResultsIterator) *ChaincodeEventsIterator { + return &ChaincodeEventsIterator{ + blockIter: NewBlockIterator(iterator), + } +} + +func (iter *ChaincodeEventsIterator) Next() (*gateway.ChaincodeEventsResponse, error) { + for { + result, err := iter.nextBlock() + if err != nil { + return nil, err + } + + if len(result.Events) > 0 { + return result, nil + } + } +} + +func (iter *ChaincodeEventsIterator) nextBlock() (*gateway.ChaincodeEventsResponse, error) { + block, err := iter.blockIter.Next() + if err != nil { + return nil, err + } + + events, err := chaincodeEventsFromBlock(block) + if err != nil { + return nil, err + } + + result := &gateway.ChaincodeEventsResponse{ + BlockNumber: block.Number(), + Events: events, + } + return result, nil +} + +func chaincodeEventsFromBlock(block *Block) ([]*peer.ChaincodeEvent, error) { + transactions, err := block.Transactions() + if err != nil { + return nil, err + } + + var results []*peer.ChaincodeEvent + + for _, transaction := range transactions { + if !transaction.Valid() { + continue + } + + events, err := transaction.ChaincodeEvents() + if err != nil { + return nil, err + } + + for _, event := range events { + results = append(results, event.ProtoMessage()) + } + } + + return results, nil +} + +func (iter *ChaincodeEventsIterator) Close() { + iter.blockIter.Close() +} diff --git a/internal/pkg/gateway/event/iterator_test.go b/internal/pkg/gateway/event/iterator_test.go new file mode 100644 index 00000000000..0a842daf710 --- /dev/null +++ b/internal/pkg/gateway/event/iterator_test.go @@ -0,0 +1,294 @@ +/* +Copyright IBM Corp. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package event_test + +import ( + "fmt" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric-protos-go/gateway" + "github.com/hyperledger/fabric-protos-go/peer" + "github.com/hyperledger/fabric/common/ledger" + "github.com/hyperledger/fabric/internal/pkg/gateway/event" + "github.com/hyperledger/fabric/internal/pkg/gateway/event/mocks" + "github.com/hyperledger/fabric/protoutil" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +//go:generate counterfeiter -o mocks/resultsiterator.go --fake-name ResultsIterator . mockResultsIterator +type mockResultsIterator interface { + ledger.ResultsIterator +} + +func TestIterators(t *testing.T) { + now := time.Now() + transactionId := "TRANSACTION_ID" + + chaincodeEvent := &peer.ChaincodeEvent{ + ChaincodeId: "CHAINCODE_ID", + TxId: transactionId, + EventName: "EVENT_NAME", + Payload: []byte("PAYLOAD"), + } + + txEnvelope := &common.Envelope{ + Payload: protoutil.MarshalOrPanic(&common.Payload{ + Header: &common.Header{ + ChannelHeader: protoutil.MarshalOrPanic(&common.ChannelHeader{ + Type: int32(common.HeaderType_ENDORSER_TRANSACTION), + Timestamp: ×tamp.Timestamp{ + Seconds: now.Unix(), + Nanos: int32(now.Nanosecond()), + }, + TxId: transactionId, + }), + }, + Data: protoutil.MarshalOrPanic(&peer.Transaction{ + Actions: []*peer.TransactionAction{ + { + Payload: protoutil.MarshalOrPanic(&peer.ChaincodeActionPayload{ + Action: &peer.ChaincodeEndorsedAction{ + ProposalResponsePayload: protoutil.MarshalOrPanic(&peer.ProposalResponsePayload{ + Extension: protoutil.MarshalOrPanic(&peer.ChaincodeAction{ + Events: protoutil.MarshalOrPanic(chaincodeEvent), + }), + }), + }, + }), + }, + }, + }), + }), + } + + blockProto := &common.Block{ + Header: &common.BlockHeader{ + Number: 1337, + }, + Metadata: &common.BlockMetadata{ + Metadata: [][]byte{ + nil, + nil, + { + byte(peer.TxValidationCode_MVCC_READ_CONFLICT), + byte(peer.TxValidationCode_VALID), + byte(peer.TxValidationCode_VALID), + }, + nil, + nil, + }, + }, + Data: &common.BlockData{ + Data: [][]byte{ + protoutil.MarshalOrPanic(txEnvelope), + protoutil.MarshalOrPanic(txEnvelope), + protoutil.MarshalOrPanic(&common.Envelope{ + Payload: protoutil.MarshalOrPanic(&common.Payload{ + Header: &common.Header{ + ChannelHeader: protoutil.MarshalOrPanic(&common.ChannelHeader{ + Type: int32(common.HeaderType_CONFIG_UPDATE), + }), + }, + }), + }), + }, + }, + } + + const invalidTxIndex = 0 + const validTxIndex = 1 + + assertExpectedBlock := func(t *testing.T, block *event.Block) { + require.NotNil(t, block, "block") + require.EqualValues(t, blockProto.GetHeader().GetNumber(), block.Number(), "block.Number()") + + transactions, err := block.Transactions() + require.NoError(t, err, "Transactions()") + require.Len(t, transactions, 2, "transactions") + + for txIndex, transaction := range transactions { + require.Equal(t, block, transaction.Block(), "transaction[%d].Block()", txIndex) + require.Equal(t, transactionId, transaction.ID(), "transaction[%d].ID()", txIndex) + require.EqualValues(t, now.Unix(), transaction.Timestamp().Seconds, "transaction[%d].Timestamp.Seconds", txIndex) + require.EqualValues(t, now.Nanosecond(), transaction.Timestamp().Nanos, "transaction[%d].Tomestamp.Nanos", txIndex) + + events, err := transaction.ChaincodeEvents() + require.NoError(t, err, "ChaincodeEvents()") + require.Len(t, events, 1, "chaincodeEvents") + + for eventIndex, event := range events { + require.Equal(t, transaction, event.Transaction(), "transaction[%d].ChaincodeEvents()[%d].Transaction()", txIndex, eventIndex) + require.Equal(t, chaincodeEvent.GetChaincodeId(), event.ChaincodeID(), "transaction[%d].ChaincodeEvents()[%d].ChaincodeID()", txIndex, eventIndex) + require.Equal(t, chaincodeEvent.GetEventName(), event.EventName(), "transaction[%d].ChaincodeEvents()[%d].EventName()", txIndex, eventIndex) + require.EqualValues(t, chaincodeEvent.GetPayload(), event.Payload(), "transaction[%d].ChaincodeEvents()[%d].Payload()", txIndex, eventIndex) + require.True(t, proto.Equal(chaincodeEvent, event.ProtoMessage()), "transaction[%d].ChaincodeEvents()[%d].ProtoMessage(): %v", txIndex, eventIndex, event.ProtoMessage()) + } + } + } + + t.Run("BlockIterator", func(t *testing.T) { + t.Run("Next", func(t *testing.T) { + t.Run("returns error from wrapped iterator", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + resultIter.NextReturns(nil, errors.New("MY_ERROR")) + + blockIter := event.NewBlockIterator(resultIter) + _, err := blockIter.Next() + + require.ErrorContains(t, err, "MY_ERROR") + }) + + t.Run("returns error if wrapped iterator returns unexpected type", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + result := &common.Envelope{} + resultIter.NextReturns(result, nil) + + blockIter := event.NewBlockIterator(resultIter) + _, err := blockIter.Next() + + require.ErrorContains(t, err, fmt.Sprintf("%T", result)) + }) + + t.Run("returns a block with no transactions", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + result := &common.Block{ + Header: &common.BlockHeader{ + Number: 418, + }, + } + resultIter.NextReturns(result, nil) + + blockIter := event.NewBlockIterator(resultIter) + block, err := blockIter.Next() + + require.NoError(t, err, "Next()") + require.NotNil(t, block, "block") + require.EqualValues(t, result.GetHeader().GetNumber(), block.Number(), "Number()") + + transactions, err := block.Transactions() + require.NoError(t, err, "Transactions()") + require.Len(t, transactions, 0, "transactions") + }) + + t.Run("returns a block with invalid transaction", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + resultIter.NextReturns(blockProto, nil) + + blockIter := event.NewBlockIterator(resultIter) + block, err := blockIter.Next() + + require.NoError(t, err, "Next()") + assertExpectedBlock(t, block) + + transactions, _ := block.Transactions() + transaction := transactions[invalidTxIndex] + require.Equal(t, peer.TxValidationCode_MVCC_READ_CONFLICT, transaction.Status()) + require.False(t, transaction.Valid(), "Valid()") + }) + + t.Run("returns a block with valid transaction", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + resultIter.NextReturns(blockProto, nil) + + blockIter := event.NewBlockIterator(resultIter) + block, err := blockIter.Next() + + require.NoError(t, err, "Next()") + assertExpectedBlock(t, block) + + transactions, _ := block.Transactions() + transaction := transactions[validTxIndex] + require.Equal(t, peer.TxValidationCode_VALID, transaction.Status()) + require.True(t, transaction.Valid(), "Valid()") + }) + }) + + t.Run("Close", func(t *testing.T) { + t.Run("closes wrapped iterator", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + + blockIter := event.NewBlockIterator(resultIter) + blockIter.Close() + + require.Equal(t, 1, resultIter.CloseCallCount()) + }) + }) + }) + + t.Run("ChaincodeEventsIterator", func(t *testing.T) { + t.Run("Next", func(t *testing.T) { + t.Run("returns error from wrapped iterator", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + resultIter.NextReturns(nil, errors.New("MY_ERROR")) + + eventsIter := event.NewChaincodeEventsIterator(resultIter) + _, err := eventsIter.Next() + + require.ErrorContains(t, err, "MY_ERROR") + }) + }) + + t.Run("only returns events for valid transactions", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + resultIter.NextReturns(blockProto, nil) + + eventsIter := event.NewChaincodeEventsIterator(resultIter) + actual, err := eventsIter.Next() + + require.NoError(t, err, "Next()") + require.NotNil(t, actual, "events") + + expected := &gateway.ChaincodeEventsResponse{ + BlockNumber: blockProto.GetHeader().GetNumber(), + Events: []*peer.ChaincodeEvent{ + chaincodeEvent, + }, + } + require.True(t, proto.Equal(expected, actual), "ChaincodeEventsResponse: %v", actual) + }) + + t.Run("skips blocks with no valid chaincode events", func(t *testing.T) { + emptyBlock := &common.Block{ + Header: &common.BlockHeader{ + Number: 418, + }, + } + resultIter := &mocks.ResultsIterator{} + resultIter.NextReturnsOnCall(0, emptyBlock, nil) + resultIter.NextReturnsOnCall(1, blockProto, nil) + + eventsIter := event.NewChaincodeEventsIterator(resultIter) + actual, err := eventsIter.Next() + + require.NoError(t, err, "Next()") + require.NotNil(t, actual, "events") + + expected := &gateway.ChaincodeEventsResponse{ + BlockNumber: blockProto.GetHeader().GetNumber(), + Events: []*peer.ChaincodeEvent{ + chaincodeEvent, + }, + } + require.True(t, proto.Equal(expected, actual), "ChaincodeEventsResponse: %v", actual) + }) + + t.Run("Close", func(t *testing.T) { + t.Run("closes wrapped iterator", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + + eventsIter := event.NewChaincodeEventsIterator(resultIter) + eventsIter.Close() + + require.Equal(t, 1, resultIter.CloseCallCount()) + }) + }) + }) +} diff --git a/internal/pkg/gateway/event/mocks/resultsiterator.go b/internal/pkg/gateway/event/mocks/resultsiterator.go new file mode 100644 index 00000000000..a8b0007f4ff --- /dev/null +++ b/internal/pkg/gateway/event/mocks/resultsiterator.go @@ -0,0 +1,135 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mocks + +import ( + "sync" + + "github.com/hyperledger/fabric/common/ledger" +) + +type ResultsIterator struct { + CloseStub func() + closeMutex sync.RWMutex + closeArgsForCall []struct { + } + NextStub func() (ledger.QueryResult, error) + nextMutex sync.RWMutex + nextArgsForCall []struct { + } + nextReturns struct { + result1 ledger.QueryResult + result2 error + } + nextReturnsOnCall map[int]struct { + result1 ledger.QueryResult + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *ResultsIterator) Close() { + fake.closeMutex.Lock() + fake.closeArgsForCall = append(fake.closeArgsForCall, struct { + }{}) + stub := fake.CloseStub + fake.recordInvocation("Close", []interface{}{}) + fake.closeMutex.Unlock() + if stub != nil { + fake.CloseStub() + } +} + +func (fake *ResultsIterator) CloseCallCount() int { + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + return len(fake.closeArgsForCall) +} + +func (fake *ResultsIterator) CloseCalls(stub func()) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = stub +} + +func (fake *ResultsIterator) Next() (ledger.QueryResult, error) { + fake.nextMutex.Lock() + ret, specificReturn := fake.nextReturnsOnCall[len(fake.nextArgsForCall)] + fake.nextArgsForCall = append(fake.nextArgsForCall, struct { + }{}) + stub := fake.NextStub + fakeReturns := fake.nextReturns + fake.recordInvocation("Next", []interface{}{}) + fake.nextMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *ResultsIterator) NextCallCount() int { + fake.nextMutex.RLock() + defer fake.nextMutex.RUnlock() + return len(fake.nextArgsForCall) +} + +func (fake *ResultsIterator) NextCalls(stub func() (ledger.QueryResult, error)) { + fake.nextMutex.Lock() + defer fake.nextMutex.Unlock() + fake.NextStub = stub +} + +func (fake *ResultsIterator) NextReturns(result1 ledger.QueryResult, result2 error) { + fake.nextMutex.Lock() + defer fake.nextMutex.Unlock() + fake.NextStub = nil + fake.nextReturns = struct { + result1 ledger.QueryResult + result2 error + }{result1, result2} +} + +func (fake *ResultsIterator) NextReturnsOnCall(i int, result1 ledger.QueryResult, result2 error) { + fake.nextMutex.Lock() + defer fake.nextMutex.Unlock() + fake.NextStub = nil + if fake.nextReturnsOnCall == nil { + fake.nextReturnsOnCall = make(map[int]struct { + result1 ledger.QueryResult + result2 error + }) + } + fake.nextReturnsOnCall[i] = struct { + result1 ledger.QueryResult + result2 error + }{result1, result2} +} + +func (fake *ResultsIterator) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + fake.nextMutex.RLock() + defer fake.nextMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *ResultsIterator) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/internal/pkg/gateway/event/transaction.go b/internal/pkg/gateway/event/transaction.go new file mode 100644 index 00000000000..f128508a20a --- /dev/null +++ b/internal/pkg/gateway/event/transaction.go @@ -0,0 +1,99 @@ +/* +Copyright IBM Corp. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package event + +import ( + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric-protos-go/peer" +) + +type Transaction struct { + parent *Block + payload *common.Payload + id string + timestamp *timestamp.Timestamp + status peer.TxValidationCode + chaincodeEvents []*ChaincodeEvent +} + +func (tx *Transaction) Block() *Block { + return tx.parent +} + +func (tx *Transaction) ID() string { + return tx.id +} + +func (tx *Transaction) Timestamp() *timestamp.Timestamp { + return tx.timestamp +} + +func (tx *Transaction) Status() peer.TxValidationCode { + return tx.status +} + +func (tx *Transaction) Valid() bool { + return tx.status == peer.TxValidationCode_VALID +} + +func (tx *Transaction) ChaincodeEvents() ([]*ChaincodeEvent, error) { + var err error + + if tx.chaincodeEvents == nil { + tx.chaincodeEvents, err = tx.readChaincodeEvents() + } + + return tx.chaincodeEvents, err +} + +func (tx *Transaction) readChaincodeEvents() ([]*ChaincodeEvent, error) { + transaction := &peer.Transaction{} + if err := proto.Unmarshal(tx.payload.GetData(), transaction); err != nil { + return nil, err + } + + chaincodeEvents := make([]*ChaincodeEvent, 0) + + for _, action := range transaction.GetActions() { + actionPayload := &peer.ChaincodeActionPayload{} + if err := proto.Unmarshal(action.GetPayload(), actionPayload); err != nil { + continue + } + + responsePayload := &peer.ProposalResponsePayload{} + if err := proto.Unmarshal(actionPayload.GetAction().GetProposalResponsePayload(), responsePayload); err != nil { + continue + } + + action := &peer.ChaincodeAction{} + if err := proto.Unmarshal(responsePayload.GetExtension(), action); err != nil { + continue + } + + event := &peer.ChaincodeEvent{} + if err := proto.Unmarshal(action.GetEvents(), event); err != nil { + continue + } + + if !validChaincodeEvent(event) { + continue + } + + chaincodeEvent := &ChaincodeEvent{ + parent: tx, + message: event, + } + chaincodeEvents = append(chaincodeEvents, chaincodeEvent) + } + + return chaincodeEvents, nil +} + +func validChaincodeEvent(event *peer.ChaincodeEvent) bool { + return len(event.GetChaincodeId()) > 0 && len(event.GetEventName()) > 0 && len(event.GetTxId()) > 0 +} diff --git a/internal/pkg/gateway/gateway.go b/internal/pkg/gateway/gateway.go index be957321241..fa6e4bbdebf 100644 --- a/internal/pkg/gateway/gateway.go +++ b/internal/pkg/gateway/gateway.go @@ -10,6 +10,7 @@ import ( peerproto "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/common/flogging" + "github.com/hyperledger/fabric/common/ledger" "github.com/hyperledger/fabric/core/peer" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/internal/pkg/gateway/commit" @@ -21,12 +22,12 @@ var logger = flogging.MustGetLogger("gateway") // Server represents the GRPC server for the Gateway. type Server struct { - registry *registry - commitFinder CommitFinder - eventer Eventer - policy ACLChecker - options config.Options - logger *flogging.FabricLogger + registry *registry + commitFinder CommitFinder + policy ACLChecker + options config.Options + logger *flogging.FabricLogger + ledgerProvider LedgerProvider } type EndorserServerAdapter struct { @@ -41,14 +42,14 @@ type CommitFinder interface { TransactionStatus(ctx context.Context, channelName string, transactionID string) (*commit.Status, error) } -type Eventer interface { - ChaincodeEvents(ctx context.Context, channelName string, chaincodeName string) (<-chan *commit.BlockChaincodeEvents, error) -} - type ACLChecker interface { CheckACL(policyName string, channelName string, data interface{}) error } +type LedgerProvider interface { + Ledger(channelName string) (ledger.Ledger, error) +} + // CreateServer creates an embedded instance of the Gateway. func CreateServer(localEndorser peerproto.EndorserServer, discovery Discovery, peerInstance *peer.Peer, policy ACLChecker, localMSPID string, options config.Options) *Server { adapter := &peerAdapter{ @@ -62,8 +63,8 @@ func CreateServer(localEndorser peerproto.EndorserServer, discovery Discovery, p }, discovery, commit.NewFinder(adapter, notifier), - commit.NewEventer(notifier), policy, + adapter, peerInstance.GossipService.SelfMembershipInfo().PKIid, peerInstance.GossipService.SelfMembershipInfo().Endpoint, localMSPID, @@ -71,8 +72,8 @@ func CreateServer(localEndorser peerproto.EndorserServer, discovery Discovery, p ) } -func newServer(localEndorser peerproto.EndorserClient, discovery Discovery, finder CommitFinder, eventer Eventer, policy ACLChecker, localPKIID common.PKIidType, localEndpoint, localMSPID string, options config.Options) *Server { - gwServer := &Server{ +func newServer(localEndorser peerproto.EndorserClient, discovery Discovery, finder CommitFinder, policy ACLChecker, ledgerProvider LedgerProvider, localPKIID common.PKIidType, localEndpoint, localMSPID string, options config.Options) *Server { + return &Server{ registry: ®istry{ localEndorser: &endorser{client: localEndorser, endpointConfig: &endpointConfig{pkiid: localPKIID, address: localEndpoint, mspid: localMSPID}}, discovery: discovery, @@ -83,12 +84,10 @@ func newServer(localEndorser peerproto.EndorserClient, discovery Discovery, find tlsRootCerts: map[string][][]byte{}, channelsInitialized: map[string]bool{}, }, - commitFinder: finder, - eventer: eventer, - policy: policy, - options: options, - logger: logger, + commitFinder: finder, + policy: policy, + options: options, + logger: logger, + ledgerProvider: ledgerProvider, } - - return gwServer } diff --git a/internal/pkg/gateway/mocks/discovery.go b/internal/pkg/gateway/mocks/discovery.go index 77d25ce34c2..782c43162c4 100644 --- a/internal/pkg/gateway/mocks/discovery.go +++ b/internal/pkg/gateway/mocks/discovery.go @@ -27,8 +27,9 @@ type Discovery struct { } IdentityInfoStub func() api.PeerIdentitySet identityInfoMutex sync.RWMutex - identityInfoArgsForCall []struct{} - identityInfoReturns struct { + identityInfoArgsForCall []struct { + } + identityInfoReturns struct { result1 api.PeerIdentitySet } identityInfoReturnsOnCall map[int]struct { @@ -130,7 +131,8 @@ func (fake *Discovery) ConfigReturnsOnCall(i int, result1 *discovery.ConfigResul func (fake *Discovery) IdentityInfo() api.PeerIdentitySet { fake.identityInfoMutex.Lock() ret, specificReturn := fake.identityInfoReturnsOnCall[len(fake.identityInfoArgsForCall)] - fake.identityInfoArgsForCall = append(fake.identityInfoArgsForCall, struct{}{}) + fake.identityInfoArgsForCall = append(fake.identityInfoArgsForCall, struct { + }{}) stub := fake.IdentityInfoStub fakeReturns := fake.identityInfoReturns fake.recordInvocation("IdentityInfo", []interface{}{}) diff --git a/internal/pkg/gateway/mocks/eventer.go b/internal/pkg/gateway/mocks/eventer.go deleted file mode 100644 index 3396f14218d..00000000000 --- a/internal/pkg/gateway/mocks/eventer.go +++ /dev/null @@ -1,119 +0,0 @@ -// Code generated by counterfeiter. DO NOT EDIT. -package mocks - -import ( - "context" - "sync" - - "github.com/hyperledger/fabric/internal/pkg/gateway/commit" -) - -type Eventer struct { - ChaincodeEventsStub func(context.Context, string, string) (<-chan *commit.BlockChaincodeEvents, error) - chaincodeEventsMutex sync.RWMutex - chaincodeEventsArgsForCall []struct { - arg1 context.Context - arg2 string - arg3 string - } - chaincodeEventsReturns struct { - result1 <-chan *commit.BlockChaincodeEvents - result2 error - } - chaincodeEventsReturnsOnCall map[int]struct { - result1 <-chan *commit.BlockChaincodeEvents - result2 error - } - invocations map[string][][]interface{} - invocationsMutex sync.RWMutex -} - -func (fake *Eventer) ChaincodeEvents(arg1 context.Context, arg2 string, arg3 string) (<-chan *commit.BlockChaincodeEvents, error) { - fake.chaincodeEventsMutex.Lock() - ret, specificReturn := fake.chaincodeEventsReturnsOnCall[len(fake.chaincodeEventsArgsForCall)] - fake.chaincodeEventsArgsForCall = append(fake.chaincodeEventsArgsForCall, struct { - arg1 context.Context - arg2 string - arg3 string - }{arg1, arg2, arg3}) - stub := fake.ChaincodeEventsStub - fakeReturns := fake.chaincodeEventsReturns - fake.recordInvocation("ChaincodeEvents", []interface{}{arg1, arg2, arg3}) - fake.chaincodeEventsMutex.Unlock() - if stub != nil { - return stub(arg1, arg2, arg3) - } - if specificReturn { - return ret.result1, ret.result2 - } - return fakeReturns.result1, fakeReturns.result2 -} - -func (fake *Eventer) ChaincodeEventsCallCount() int { - fake.chaincodeEventsMutex.RLock() - defer fake.chaincodeEventsMutex.RUnlock() - return len(fake.chaincodeEventsArgsForCall) -} - -func (fake *Eventer) ChaincodeEventsCalls(stub func(context.Context, string, string) (<-chan *commit.BlockChaincodeEvents, error)) { - fake.chaincodeEventsMutex.Lock() - defer fake.chaincodeEventsMutex.Unlock() - fake.ChaincodeEventsStub = stub -} - -func (fake *Eventer) ChaincodeEventsArgsForCall(i int) (context.Context, string, string) { - fake.chaincodeEventsMutex.RLock() - defer fake.chaincodeEventsMutex.RUnlock() - argsForCall := fake.chaincodeEventsArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 -} - -func (fake *Eventer) ChaincodeEventsReturns(result1 <-chan *commit.BlockChaincodeEvents, result2 error) { - fake.chaincodeEventsMutex.Lock() - defer fake.chaincodeEventsMutex.Unlock() - fake.ChaincodeEventsStub = nil - fake.chaincodeEventsReturns = struct { - result1 <-chan *commit.BlockChaincodeEvents - result2 error - }{result1, result2} -} - -func (fake *Eventer) ChaincodeEventsReturnsOnCall(i int, result1 <-chan *commit.BlockChaincodeEvents, result2 error) { - fake.chaincodeEventsMutex.Lock() - defer fake.chaincodeEventsMutex.Unlock() - fake.ChaincodeEventsStub = nil - if fake.chaincodeEventsReturnsOnCall == nil { - fake.chaincodeEventsReturnsOnCall = make(map[int]struct { - result1 <-chan *commit.BlockChaincodeEvents - result2 error - }) - } - fake.chaincodeEventsReturnsOnCall[i] = struct { - result1 <-chan *commit.BlockChaincodeEvents - result2 error - }{result1, result2} -} - -func (fake *Eventer) Invocations() map[string][][]interface{} { - fake.invocationsMutex.RLock() - defer fake.invocationsMutex.RUnlock() - fake.chaincodeEventsMutex.RLock() - defer fake.chaincodeEventsMutex.RUnlock() - copiedInvocations := map[string][][]interface{}{} - for key, value := range fake.invocations { - copiedInvocations[key] = value - } - return copiedInvocations -} - -func (fake *Eventer) recordInvocation(key string, args []interface{}) { - fake.invocationsMutex.Lock() - defer fake.invocationsMutex.Unlock() - if fake.invocations == nil { - fake.invocations = map[string][][]interface{}{} - } - if fake.invocations[key] == nil { - fake.invocations[key] = [][]interface{}{} - } - fake.invocations[key] = append(fake.invocations[key], args) -} diff --git a/internal/pkg/gateway/mocks/ledger.go b/internal/pkg/gateway/mocks/ledger.go new file mode 100644 index 00000000000..e6dc41e00e0 --- /dev/null +++ b/internal/pkg/gateway/mocks/ledger.go @@ -0,0 +1,294 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mocks + +import ( + "sync" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/common/ledger" +) + +type Ledger struct { + CloseStub func() + closeMutex sync.RWMutex + closeArgsForCall []struct { + } + GetBlockByNumberStub func(uint64) (*common.Block, error) + getBlockByNumberMutex sync.RWMutex + getBlockByNumberArgsForCall []struct { + arg1 uint64 + } + getBlockByNumberReturns struct { + result1 *common.Block + result2 error + } + getBlockByNumberReturnsOnCall map[int]struct { + result1 *common.Block + result2 error + } + GetBlockchainInfoStub func() (*common.BlockchainInfo, error) + getBlockchainInfoMutex sync.RWMutex + getBlockchainInfoArgsForCall []struct { + } + getBlockchainInfoReturns struct { + result1 *common.BlockchainInfo + result2 error + } + getBlockchainInfoReturnsOnCall map[int]struct { + result1 *common.BlockchainInfo + result2 error + } + GetBlocksIteratorStub func(uint64) (ledger.ResultsIterator, error) + getBlocksIteratorMutex sync.RWMutex + getBlocksIteratorArgsForCall []struct { + arg1 uint64 + } + getBlocksIteratorReturns struct { + result1 ledger.ResultsIterator + result2 error + } + getBlocksIteratorReturnsOnCall map[int]struct { + result1 ledger.ResultsIterator + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *Ledger) Close() { + fake.closeMutex.Lock() + fake.closeArgsForCall = append(fake.closeArgsForCall, struct { + }{}) + stub := fake.CloseStub + fake.recordInvocation("Close", []interface{}{}) + fake.closeMutex.Unlock() + if stub != nil { + fake.CloseStub() + } +} + +func (fake *Ledger) CloseCallCount() int { + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + return len(fake.closeArgsForCall) +} + +func (fake *Ledger) CloseCalls(stub func()) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = stub +} + +func (fake *Ledger) GetBlockByNumber(arg1 uint64) (*common.Block, error) { + fake.getBlockByNumberMutex.Lock() + ret, specificReturn := fake.getBlockByNumberReturnsOnCall[len(fake.getBlockByNumberArgsForCall)] + fake.getBlockByNumberArgsForCall = append(fake.getBlockByNumberArgsForCall, struct { + arg1 uint64 + }{arg1}) + stub := fake.GetBlockByNumberStub + fakeReturns := fake.getBlockByNumberReturns + fake.recordInvocation("GetBlockByNumber", []interface{}{arg1}) + fake.getBlockByNumberMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *Ledger) GetBlockByNumberCallCount() int { + fake.getBlockByNumberMutex.RLock() + defer fake.getBlockByNumberMutex.RUnlock() + return len(fake.getBlockByNumberArgsForCall) +} + +func (fake *Ledger) GetBlockByNumberCalls(stub func(uint64) (*common.Block, error)) { + fake.getBlockByNumberMutex.Lock() + defer fake.getBlockByNumberMutex.Unlock() + fake.GetBlockByNumberStub = stub +} + +func (fake *Ledger) GetBlockByNumberArgsForCall(i int) uint64 { + fake.getBlockByNumberMutex.RLock() + defer fake.getBlockByNumberMutex.RUnlock() + argsForCall := fake.getBlockByNumberArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *Ledger) GetBlockByNumberReturns(result1 *common.Block, result2 error) { + fake.getBlockByNumberMutex.Lock() + defer fake.getBlockByNumberMutex.Unlock() + fake.GetBlockByNumberStub = nil + fake.getBlockByNumberReturns = struct { + result1 *common.Block + result2 error + }{result1, result2} +} + +func (fake *Ledger) GetBlockByNumberReturnsOnCall(i int, result1 *common.Block, result2 error) { + fake.getBlockByNumberMutex.Lock() + defer fake.getBlockByNumberMutex.Unlock() + fake.GetBlockByNumberStub = nil + if fake.getBlockByNumberReturnsOnCall == nil { + fake.getBlockByNumberReturnsOnCall = make(map[int]struct { + result1 *common.Block + result2 error + }) + } + fake.getBlockByNumberReturnsOnCall[i] = struct { + result1 *common.Block + result2 error + }{result1, result2} +} + +func (fake *Ledger) GetBlockchainInfo() (*common.BlockchainInfo, error) { + fake.getBlockchainInfoMutex.Lock() + ret, specificReturn := fake.getBlockchainInfoReturnsOnCall[len(fake.getBlockchainInfoArgsForCall)] + fake.getBlockchainInfoArgsForCall = append(fake.getBlockchainInfoArgsForCall, struct { + }{}) + stub := fake.GetBlockchainInfoStub + fakeReturns := fake.getBlockchainInfoReturns + fake.recordInvocation("GetBlockchainInfo", []interface{}{}) + fake.getBlockchainInfoMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *Ledger) GetBlockchainInfoCallCount() int { + fake.getBlockchainInfoMutex.RLock() + defer fake.getBlockchainInfoMutex.RUnlock() + return len(fake.getBlockchainInfoArgsForCall) +} + +func (fake *Ledger) GetBlockchainInfoCalls(stub func() (*common.BlockchainInfo, error)) { + fake.getBlockchainInfoMutex.Lock() + defer fake.getBlockchainInfoMutex.Unlock() + fake.GetBlockchainInfoStub = stub +} + +func (fake *Ledger) GetBlockchainInfoReturns(result1 *common.BlockchainInfo, result2 error) { + fake.getBlockchainInfoMutex.Lock() + defer fake.getBlockchainInfoMutex.Unlock() + fake.GetBlockchainInfoStub = nil + fake.getBlockchainInfoReturns = struct { + result1 *common.BlockchainInfo + result2 error + }{result1, result2} +} + +func (fake *Ledger) GetBlockchainInfoReturnsOnCall(i int, result1 *common.BlockchainInfo, result2 error) { + fake.getBlockchainInfoMutex.Lock() + defer fake.getBlockchainInfoMutex.Unlock() + fake.GetBlockchainInfoStub = nil + if fake.getBlockchainInfoReturnsOnCall == nil { + fake.getBlockchainInfoReturnsOnCall = make(map[int]struct { + result1 *common.BlockchainInfo + result2 error + }) + } + fake.getBlockchainInfoReturnsOnCall[i] = struct { + result1 *common.BlockchainInfo + result2 error + }{result1, result2} +} + +func (fake *Ledger) GetBlocksIterator(arg1 uint64) (ledger.ResultsIterator, error) { + fake.getBlocksIteratorMutex.Lock() + ret, specificReturn := fake.getBlocksIteratorReturnsOnCall[len(fake.getBlocksIteratorArgsForCall)] + fake.getBlocksIteratorArgsForCall = append(fake.getBlocksIteratorArgsForCall, struct { + arg1 uint64 + }{arg1}) + stub := fake.GetBlocksIteratorStub + fakeReturns := fake.getBlocksIteratorReturns + fake.recordInvocation("GetBlocksIterator", []interface{}{arg1}) + fake.getBlocksIteratorMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *Ledger) GetBlocksIteratorCallCount() int { + fake.getBlocksIteratorMutex.RLock() + defer fake.getBlocksIteratorMutex.RUnlock() + return len(fake.getBlocksIteratorArgsForCall) +} + +func (fake *Ledger) GetBlocksIteratorCalls(stub func(uint64) (ledger.ResultsIterator, error)) { + fake.getBlocksIteratorMutex.Lock() + defer fake.getBlocksIteratorMutex.Unlock() + fake.GetBlocksIteratorStub = stub +} + +func (fake *Ledger) GetBlocksIteratorArgsForCall(i int) uint64 { + fake.getBlocksIteratorMutex.RLock() + defer fake.getBlocksIteratorMutex.RUnlock() + argsForCall := fake.getBlocksIteratorArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *Ledger) GetBlocksIteratorReturns(result1 ledger.ResultsIterator, result2 error) { + fake.getBlocksIteratorMutex.Lock() + defer fake.getBlocksIteratorMutex.Unlock() + fake.GetBlocksIteratorStub = nil + fake.getBlocksIteratorReturns = struct { + result1 ledger.ResultsIterator + result2 error + }{result1, result2} +} + +func (fake *Ledger) GetBlocksIteratorReturnsOnCall(i int, result1 ledger.ResultsIterator, result2 error) { + fake.getBlocksIteratorMutex.Lock() + defer fake.getBlocksIteratorMutex.Unlock() + fake.GetBlocksIteratorStub = nil + if fake.getBlocksIteratorReturnsOnCall == nil { + fake.getBlocksIteratorReturnsOnCall = make(map[int]struct { + result1 ledger.ResultsIterator + result2 error + }) + } + fake.getBlocksIteratorReturnsOnCall[i] = struct { + result1 ledger.ResultsIterator + result2 error + }{result1, result2} +} + +func (fake *Ledger) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + fake.getBlockByNumberMutex.RLock() + defer fake.getBlockByNumberMutex.RUnlock() + fake.getBlockchainInfoMutex.RLock() + defer fake.getBlockchainInfoMutex.RUnlock() + fake.getBlocksIteratorMutex.RLock() + defer fake.getBlocksIteratorMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *Ledger) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/internal/pkg/gateway/mocks/ledgerprovider.go b/internal/pkg/gateway/mocks/ledgerprovider.go new file mode 100644 index 00000000000..bfbaf33ee08 --- /dev/null +++ b/internal/pkg/gateway/mocks/ledgerprovider.go @@ -0,0 +1,114 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mocks + +import ( + "sync" + + "github.com/hyperledger/fabric/common/ledger" +) + +type LedgerProvider struct { + LedgerStub func(string) (ledger.Ledger, error) + ledgerMutex sync.RWMutex + ledgerArgsForCall []struct { + arg1 string + } + ledgerReturns struct { + result1 ledger.Ledger + result2 error + } + ledgerReturnsOnCall map[int]struct { + result1 ledger.Ledger + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *LedgerProvider) Ledger(arg1 string) (ledger.Ledger, error) { + fake.ledgerMutex.Lock() + ret, specificReturn := fake.ledgerReturnsOnCall[len(fake.ledgerArgsForCall)] + fake.ledgerArgsForCall = append(fake.ledgerArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.LedgerStub + fakeReturns := fake.ledgerReturns + fake.recordInvocation("Ledger", []interface{}{arg1}) + fake.ledgerMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *LedgerProvider) LedgerCallCount() int { + fake.ledgerMutex.RLock() + defer fake.ledgerMutex.RUnlock() + return len(fake.ledgerArgsForCall) +} + +func (fake *LedgerProvider) LedgerCalls(stub func(string) (ledger.Ledger, error)) { + fake.ledgerMutex.Lock() + defer fake.ledgerMutex.Unlock() + fake.LedgerStub = stub +} + +func (fake *LedgerProvider) LedgerArgsForCall(i int) string { + fake.ledgerMutex.RLock() + defer fake.ledgerMutex.RUnlock() + argsForCall := fake.ledgerArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *LedgerProvider) LedgerReturns(result1 ledger.Ledger, result2 error) { + fake.ledgerMutex.Lock() + defer fake.ledgerMutex.Unlock() + fake.LedgerStub = nil + fake.ledgerReturns = struct { + result1 ledger.Ledger + result2 error + }{result1, result2} +} + +func (fake *LedgerProvider) LedgerReturnsOnCall(i int, result1 ledger.Ledger, result2 error) { + fake.ledgerMutex.Lock() + defer fake.ledgerMutex.Unlock() + fake.LedgerStub = nil + if fake.ledgerReturnsOnCall == nil { + fake.ledgerReturnsOnCall = make(map[int]struct { + result1 ledger.Ledger + result2 error + }) + } + fake.ledgerReturnsOnCall[i] = struct { + result1 ledger.Ledger + result2 error + }{result1, result2} +} + +func (fake *LedgerProvider) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.ledgerMutex.RLock() + defer fake.ledgerMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *LedgerProvider) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/internal/pkg/gateway/mocks/resultsiterator.go b/internal/pkg/gateway/mocks/resultsiterator.go new file mode 100644 index 00000000000..a8b0007f4ff --- /dev/null +++ b/internal/pkg/gateway/mocks/resultsiterator.go @@ -0,0 +1,135 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mocks + +import ( + "sync" + + "github.com/hyperledger/fabric/common/ledger" +) + +type ResultsIterator struct { + CloseStub func() + closeMutex sync.RWMutex + closeArgsForCall []struct { + } + NextStub func() (ledger.QueryResult, error) + nextMutex sync.RWMutex + nextArgsForCall []struct { + } + nextReturns struct { + result1 ledger.QueryResult + result2 error + } + nextReturnsOnCall map[int]struct { + result1 ledger.QueryResult + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *ResultsIterator) Close() { + fake.closeMutex.Lock() + fake.closeArgsForCall = append(fake.closeArgsForCall, struct { + }{}) + stub := fake.CloseStub + fake.recordInvocation("Close", []interface{}{}) + fake.closeMutex.Unlock() + if stub != nil { + fake.CloseStub() + } +} + +func (fake *ResultsIterator) CloseCallCount() int { + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + return len(fake.closeArgsForCall) +} + +func (fake *ResultsIterator) CloseCalls(stub func()) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = stub +} + +func (fake *ResultsIterator) Next() (ledger.QueryResult, error) { + fake.nextMutex.Lock() + ret, specificReturn := fake.nextReturnsOnCall[len(fake.nextArgsForCall)] + fake.nextArgsForCall = append(fake.nextArgsForCall, struct { + }{}) + stub := fake.NextStub + fakeReturns := fake.nextReturns + fake.recordInvocation("Next", []interface{}{}) + fake.nextMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *ResultsIterator) NextCallCount() int { + fake.nextMutex.RLock() + defer fake.nextMutex.RUnlock() + return len(fake.nextArgsForCall) +} + +func (fake *ResultsIterator) NextCalls(stub func() (ledger.QueryResult, error)) { + fake.nextMutex.Lock() + defer fake.nextMutex.Unlock() + fake.NextStub = stub +} + +func (fake *ResultsIterator) NextReturns(result1 ledger.QueryResult, result2 error) { + fake.nextMutex.Lock() + defer fake.nextMutex.Unlock() + fake.NextStub = nil + fake.nextReturns = struct { + result1 ledger.QueryResult + result2 error + }{result1, result2} +} + +func (fake *ResultsIterator) NextReturnsOnCall(i int, result1 ledger.QueryResult, result2 error) { + fake.nextMutex.Lock() + defer fake.nextMutex.Unlock() + fake.NextStub = nil + if fake.nextReturnsOnCall == nil { + fake.nextReturnsOnCall = make(map[int]struct { + result1 ledger.QueryResult + result2 error + }) + } + fake.nextReturnsOnCall[i] = struct { + result1 ledger.QueryResult + result2 error + }{result1, result2} +} + +func (fake *ResultsIterator) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + fake.nextMutex.RLock() + defer fake.nextMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *ResultsIterator) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/internal/pkg/gateway/peeradapter.go b/internal/pkg/gateway/peeradapter.go index 8ba43d50cb4..ad75b17ab19 100644 --- a/internal/pkg/gateway/peeradapter.go +++ b/internal/pkg/gateway/peeradapter.go @@ -8,18 +8,19 @@ package gateway import ( peerproto "github.com/hyperledger/fabric-protos-go/peer" - "github.com/hyperledger/fabric/core/ledger" + commonledger "github.com/hyperledger/fabric/common/ledger" + coreledger "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/peer" "github.com/pkg/errors" ) // peerAdapter presents a small piece of the Peer in a form that can be easily used (and mocked) by the gateway's -// transaction status checking. +// transaction status checking and eventing. type peerAdapter struct { Peer *peer.Peer } -func (adapter *peerAdapter) CommitNotifications(done <-chan struct{}, channelName string) (<-chan *ledger.CommitNotification, error) { +func (adapter *peerAdapter) CommitNotifications(done <-chan struct{}, channelName string) (<-chan *coreledger.CommitNotification, error) { channel, err := adapter.channel(channelName) if err != nil { return nil, err @@ -34,9 +35,7 @@ func (adapter *peerAdapter) TransactionStatus(channelName string, transactionID return 0, 0, err } - ledger := channel.Ledger() - - status, blockNumber, err := ledger.GetTxValidationCodeByTxID(transactionID) + status, blockNumber, err := channel.Ledger().GetTxValidationCodeByTxID(transactionID) if err != nil { return 0, 0, err } @@ -44,6 +43,15 @@ func (adapter *peerAdapter) TransactionStatus(channelName string, transactionID return status, blockNumber, nil } +func (adapter *peerAdapter) Ledger(channelName string) (commonledger.Ledger, error) { + channel, err := adapter.channel(channelName) + if err != nil { + return nil, err + } + + return channel.Ledger(), nil +} + func (adapter *peerAdapter) channel(name string) (*peer.Channel, error) { channel := adapter.Peer.Channel(name) if channel == nil { diff --git a/internal/pkg/gateway/peeradapter_test.go b/internal/pkg/gateway/peeradapter_test.go index 9286ffa1f3d..d028cd9f327 100644 --- a/internal/pkg/gateway/peeradapter_test.go +++ b/internal/pkg/gateway/peeradapter_test.go @@ -37,4 +37,16 @@ func TestPeerAdapter(t *testing.T) { require.ErrorContains(t, err, "CHANNEL") }) }) + + t.Run("Ledger", func(t *testing.T) { + t.Run("returns error when channel does not exist", func(t *testing.T) { + adapter := &peerAdapter{ + Peer: &peer.Peer{}, + } + + _, err := adapter.Ledger("CHANNEL") + + require.ErrorContains(t, err, "CHANNEL") + }) + }) }