Skip to content

Commit

Permalink
Implement chaincode event replay for Fabric Gateway (#2896)
Browse files Browse the repository at this point in the history
* Implement chaincode event replay for Fabric Gateway

Signed-off-by: Mark S. Lewis <mark_lewis@uk.ibm.com>

* Default to next commit as start position for ChaincodeEvents

Signed-off-by: Mark S. Lewis <mark_lewis@uk.ibm.com>
  • Loading branch information
bestbeforetoday authored Sep 7, 2021
1 parent dd539d3 commit beb8f5d
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 80 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/hyperledger/fabric-chaincode-go v0.0.0-20201119163726-f8ef75b17719
github.com/hyperledger/fabric-config v0.1.0
github.com/hyperledger/fabric-lib-go v1.0.0
github.com/hyperledger/fabric-protos-go v0.0.0-20210720123151-f0dc3e2a0871
github.com/hyperledger/fabric-protos-go v0.0.0-20210903093419-e9e1b9f969d8
github.com/kr/pretty v0.2.1
github.com/magiconair/properties v1.8.1 // indirect
github.com/mattn/go-runewidth v0.0.4 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ github.com/hyperledger/fabric-lib-go v1.0.0 h1:UL1w7c9LvHZUSkIvHTDGklxFv2kTeva1Q
github.com/hyperledger/fabric-lib-go v1.0.0/go.mod h1:H362nMlunurmHwkYqR5uHL2UDWbQdbfz74n8kbCFsqc=
github.com/hyperledger/fabric-protos-go v0.0.0-20190919234611-2a87503ac7c9/go.mod h1:xVYTjK4DtZRBxZ2D9aE4y6AbLaPwue2o/criQyQbVD0=
github.com/hyperledger/fabric-protos-go v0.0.0-20200424173316-dd554ba3746e/go.mod h1:xVYTjK4DtZRBxZ2D9aE4y6AbLaPwue2o/criQyQbVD0=
github.com/hyperledger/fabric-protos-go v0.0.0-20210720123151-f0dc3e2a0871 h1:d7do07Q4LaOFAEWceRwUwVDdcfx3BdLeZYyUGtbHfRk=
github.com/hyperledger/fabric-protos-go v0.0.0-20210720123151-f0dc3e2a0871/go.mod h1:xVYTjK4DtZRBxZ2D9aE4y6AbLaPwue2o/criQyQbVD0=
github.com/hyperledger/fabric-protos-go v0.0.0-20210903093419-e9e1b9f969d8 h1:6Qt3MdBqww+aJCDaUDIOhA6EcDhQln5l8wqBXehM96A=
github.com/hyperledger/fabric-protos-go v0.0.0-20210903093419-e9e1b9f969d8/go.mod h1:xVYTjK4DtZRBxZ2D9aE4y6AbLaPwue2o/criQyQbVD0=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
Expand Down
146 changes: 134 additions & 12 deletions integration/gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/gateway"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/integration/nwo"
"github.com/hyperledger/fabric/protoutil"
Expand Down Expand Up @@ -204,6 +205,38 @@ var _ = Describe("GatewayService", func() {
return gatewayClient.CommitStatus(ctx, signedStatusRequest)
}

chaincodeEvents := func(
ctx context.Context,
startPosition *orderer.SeekPosition,
identity func() ([]byte, error),
sign func(msg []byte) ([]byte, error),
) (gateway.Gateway_ChaincodeEventsClient, error) {
identityBytes, err := identity()
Expect(err).NotTo(HaveOccurred())

request := &gateway.ChaincodeEventsRequest{
ChannelId: "testchannel",
ChaincodeId: "gatewaycc",
Identity: identityBytes,
}
if startPosition != nil {
request.StartPosition = startPosition
}

requestBytes, err := proto.Marshal(request)
Expect(err).NotTo(HaveOccurred())

signature, err := sign(requestBytes)
Expect(err).NotTo(HaveOccurred())

signedRequest := &gateway.SignedChaincodeEventsRequest{
Request: requestBytes,
Signature: signature,
}

return gatewayClient.ChaincodeEvents(ctx, signedRequest)
}

Describe("Evaluate", func() {
It("should respond with the expected result", func() {
proposedTransaction, transactionID := NewProposedTransaction(signingIdentity, "testchannel", "gatewaycc", "respond", nil, []byte("200"), []byte("conga message"), []byte("conga payload"))
Expand Down Expand Up @@ -244,10 +277,10 @@ var _ = Describe("GatewayService", func() {
Describe("CommitStatus", func() {
It("should respond with status of submitted transaction", func() {
_, transactionID := submitTransaction("respond", []byte("200"), []byte("conga message"), []byte("conga payload"))
status, err := commitStatus(transactionID, signingIdentity.Serialize, signingIdentity.Sign)
statusResult, err := commitStatus(transactionID, signingIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

Expect(status.Result).To(Equal(peer.TxValidationCode_VALID))
Expect(statusResult.Result).To(Equal(peer.TxValidationCode_VALID))
})

It("should respond with block number", func() {
Expand Down Expand Up @@ -287,30 +320,73 @@ var _ = Describe("GatewayService", func() {

Describe("ChaincodeEvents", func() {
It("should respond with emitted chaincode events", func() {
identityBytes, err := signingIdentity.Serialize()
eventCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

startPosition := &orderer.SeekPosition{
Type: &orderer.SeekPosition_NextCommit{
NextCommit: &orderer.SeekNextCommit{},
},
}

eventsClient, err := chaincodeEvents(eventCtx, startPosition, signingIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

_, transactionID := submitTransaction("event", []byte("EVENT_NAME"), []byte("EVENT_PAYLOAD"))

event, err := eventsClient.Recv()
Expect(err).NotTo(HaveOccurred())

request := &gateway.ChaincodeEventsRequest{
ChannelId: "testchannel",
Expect(event.Events).To(HaveLen(1), "number of events")
expectedEvent := &peer.ChaincodeEvent{
ChaincodeId: "gatewaycc",
Identity: identityBytes,
TxId: transactionID,
EventName: "EVENT_NAME",
Payload: []byte("EVENT_PAYLOAD"),
}
Expect(proto.Equal(event.Events[0], expectedEvent)).To(BeTrue(), "Expected\n\t%#v\nto proto.Equal\n\t%#v", event.Events[0], expectedEvent)
})

It("should respond with replayed chaincode events", func() {
_, transactionID := submitTransaction("event", []byte("EVENT_NAME"), []byte("EVENT_PAYLOAD"))
statusResult, err := commitStatus(transactionID, signingIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

eventCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

startPosition := &orderer.SeekPosition{
Type: &orderer.SeekPosition_Specified{
Specified: &orderer.SeekSpecified{
Number: statusResult.BlockNumber,
},
},
}

requestBytes, err := proto.Marshal(request)
eventsClient, err := chaincodeEvents(eventCtx, startPosition, signingIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

signature, err := signingIdentity.Sign(requestBytes)
event, err := eventsClient.Recv()
Expect(err).NotTo(HaveOccurred())

signedRequest := &gateway.SignedChaincodeEventsRequest{
Request: requestBytes,
Signature: signature,
Expect(event.BlockNumber).To(Equal(statusResult.BlockNumber), "block number")
Expect(event.Events).To(HaveLen(1), "number of events")
expectedEvent := &peer.ChaincodeEvent{
ChaincodeId: "gatewaycc",
TxId: transactionID,
EventName: "EVENT_NAME",
Payload: []byte("EVENT_PAYLOAD"),
}
Expect(proto.Equal(event.Events[0], expectedEvent)).To(BeTrue(), "Expected\n\t%#v\nto proto.Equal\n\t%#v", event.Events[0], expectedEvent)
})

It("should default to next commit if start position not specified", func() {
eventCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

eventsClient, err := gatewayClient.ChaincodeEvents(eventCtx, signedRequest)
var startPosition *orderer.SeekPosition

eventsClient, err := chaincodeEvents(eventCtx, startPosition, signingIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

_, transactionID := submitTransaction("event", []byte("EVENT_NAME"), []byte("EVENT_PAYLOAD"))
Expand All @@ -327,5 +403,51 @@ var _ = Describe("GatewayService", func() {
}
Expect(proto.Equal(event.Events[0], expectedEvent)).To(BeTrue(), "Expected\n\t%#v\nto proto.Equal\n\t%#v", event.Events[0], expectedEvent)
})

It("should fail on unauthorized identity", func() {
badIdentity := network.OrdererUserSigner(network.Orderer("orderer"), "Admin")

eventCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

startPosition := &orderer.SeekPosition{
Type: &orderer.SeekPosition_NextCommit{
NextCommit: &orderer.SeekNextCommit{},
},
}

eventsClient, err := chaincodeEvents(eventCtx, startPosition, badIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

event, err := eventsClient.Recv()
Expect(err).To(HaveOccurred(), "expected error but got event: %v", event)

grpcErr, _ := status.FromError(err)
Expect(grpcErr.Code()).To(Equal(codes.PermissionDenied))
})

It("should fail on bad signature", func() {
badSign := func(digest []byte) ([]byte, error) {
return signingIdentity.Sign([]byte("WRONG"))
}

eventCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

startPosition := &orderer.SeekPosition{
Type: &orderer.SeekPosition_NextCommit{
NextCommit: &orderer.SeekNextCommit{},
},
}

eventsClient, err := chaincodeEvents(eventCtx, startPosition, signingIdentity.Serialize, badSign)
Expect(err).NotTo(HaveOccurred())

event, err := eventsClient.Recv()
Expect(err).To(HaveOccurred(), "expected error but got event: %v", event)

grpcErr, _ := status.FromError(err)
Expect(grpcErr.Code()).To(Equal(codes.PermissionDenied))
})
})
})
33 changes: 27 additions & 6 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
gp "github.com/hyperledger/fabric-protos-go/gateway"
ab "github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/core/aclmgmt/resources"
"github.com/hyperledger/fabric/internal/pkg/gateway/event"
Expand Down Expand Up @@ -356,17 +357,37 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest
return status.Error(codes.PermissionDenied, err.Error())
}

ledger, err := gs.ledgerProvider.Ledger(request.ChannelId)
ledger, err := gs.ledgerProvider.Ledger(request.GetChannelId())
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

ledgerInfo, err := ledger.GetBlockchainInfo()
if err != nil {
return status.Error(codes.Unavailable, err.Error())
var startBlock uint64
switch seek := request.GetStartPosition().GetType().(type) {
case nil:
ledgerInfo, err := ledger.GetBlockchainInfo()
if err != nil {
return status.Error(codes.Unavailable, err.Error())
}

startBlock = ledgerInfo.GetHeight()

case *ab.SeekPosition_NextCommit:
ledgerInfo, err := ledger.GetBlockchainInfo()
if err != nil {
return status.Error(codes.Unavailable, err.Error())
}

startBlock = ledgerInfo.GetHeight()

case *ab.SeekPosition_Specified:
startBlock = seek.Specified.GetNumber()

default:
return status.Errorf(codes.InvalidArgument, "invalid start position type: %T", seek)
}

ledgerIter, err := ledger.GetBlocksIterator(ledgerInfo.Height)
ledgerIter, err := ledger.GetBlocksIterator(startBlock)
if err != nil {
return status.Error(codes.Unavailable, err.Error())
}
Expand All @@ -383,7 +404,7 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest
var matchingEvents []*peer.ChaincodeEvent

for _, event := range response.Events {
if event.GetChaincodeId() == request.ChaincodeId {
if event.GetChaincodeId() == request.GetChaincodeId() {
matchingEvents = append(matchingEvents, event)
}
}
Expand Down
62 changes: 61 additions & 1 deletion internal/pkg/gateway/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ type testDef struct {
transientData map[string][]byte
interest *peer.ChaincodeInterest
blocks []*cp.Block
startPosition *ab.SeekPosition
}

type preparedTest struct {
Expand Down Expand Up @@ -1176,7 +1177,51 @@ func TestChaincodeEvents(t *testing.T) {
},
},
{
name: "uses block height as default start block",
name: "uses block height as start block if next commit is specified as start position",
blocks: []*cp.Block{
block101Proto,
},
postSetup: func(t *testing.T, test *preparedTest) {
ledgerInfo := &cp.BlockchainInfo{
Height: 101,
}
test.ledger.GetBlockchainInfoReturns(ledgerInfo, nil)
},
startPosition: &ab.SeekPosition{
Type: &ab.SeekPosition_NextCommit{
NextCommit: &ab.SeekNextCommit{},
},
},
postTest: func(t *testing.T, test *preparedTest) {
require.Equal(t, 1, test.ledger.GetBlocksIteratorCallCount())
require.EqualValues(t, 101, test.ledger.GetBlocksIteratorArgsForCall(0))
},
},
{
name: "uses specified start block",
blocks: []*cp.Block{
block101Proto,
},
postSetup: func(t *testing.T, test *preparedTest) {
ledgerInfo := &cp.BlockchainInfo{
Height: 101,
}
test.ledger.GetBlockchainInfoReturns(ledgerInfo, nil)
},
startPosition: &ab.SeekPosition{
Type: &ab.SeekPosition_Specified{
Specified: &ab.SeekSpecified{
Number: 99,
},
},
},
postTest: func(t *testing.T, test *preparedTest) {
require.Equal(t, 1, test.ledger.GetBlocksIteratorCallCount())
require.EqualValues(t, 99, test.ledger.GetBlocksIteratorArgsForCall(0))
},
},
{
name: "defaults to next commit if start position not specified",
blocks: []*cp.Block{
block101Proto,
},
Expand All @@ -1191,6 +1236,18 @@ func TestChaincodeEvents(t *testing.T) {
require.EqualValues(t, 101, test.ledger.GetBlocksIteratorArgsForCall(0))
},
},
{
name: "returns error for unsupported start position type",
blocks: []*cp.Block{
block101Proto,
},
startPosition: &ab.SeekPosition{
Type: &ab.SeekPosition_Oldest{
Oldest: &ab.SeekOldest{},
},
},
errString: "rpc error: code = InvalidArgument desc = invalid start position type: *orderer.SeekPosition_Oldest",
},
{
name: "returns error obtaining ledger iterator",
blocks: []*cp.Block{
Expand Down Expand Up @@ -1245,6 +1302,9 @@ func TestChaincodeEvents(t *testing.T) {
Identity: tt.identity,
ChaincodeId: testChaincode,
}
if tt.startPosition != nil {
request.StartPosition = tt.startPosition
}
requestBytes, err := proto.Marshal(request)
require.NoError(t, err)

Expand Down
Loading

0 comments on commit beb8f5d

Please sign in to comment.