Skip to content

Commit

Permalink
Implement chaincode event replay for Fabric Gateway
Browse files Browse the repository at this point in the history
Signed-off-by: Mark S. Lewis <mark_lewis@uk.ibm.com>
  • Loading branch information
bestbeforetoday committed Sep 7, 2021
1 parent dd539d3 commit 7747938
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 85 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/hyperledger/fabric-chaincode-go v0.0.0-20201119163726-f8ef75b17719
github.com/hyperledger/fabric-config v0.1.0
github.com/hyperledger/fabric-lib-go v1.0.0
github.com/hyperledger/fabric-protos-go v0.0.0-20210720123151-f0dc3e2a0871
github.com/hyperledger/fabric-protos-go v0.0.0-20210903093419-e9e1b9f969d8
github.com/kr/pretty v0.2.1
github.com/magiconair/properties v1.8.1 // indirect
github.com/mattn/go-runewidth v0.0.4 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ github.com/hyperledger/fabric-lib-go v1.0.0 h1:UL1w7c9LvHZUSkIvHTDGklxFv2kTeva1Q
github.com/hyperledger/fabric-lib-go v1.0.0/go.mod h1:H362nMlunurmHwkYqR5uHL2UDWbQdbfz74n8kbCFsqc=
github.com/hyperledger/fabric-protos-go v0.0.0-20190919234611-2a87503ac7c9/go.mod h1:xVYTjK4DtZRBxZ2D9aE4y6AbLaPwue2o/criQyQbVD0=
github.com/hyperledger/fabric-protos-go v0.0.0-20200424173316-dd554ba3746e/go.mod h1:xVYTjK4DtZRBxZ2D9aE4y6AbLaPwue2o/criQyQbVD0=
github.com/hyperledger/fabric-protos-go v0.0.0-20210720123151-f0dc3e2a0871 h1:d7do07Q4LaOFAEWceRwUwVDdcfx3BdLeZYyUGtbHfRk=
github.com/hyperledger/fabric-protos-go v0.0.0-20210720123151-f0dc3e2a0871/go.mod h1:xVYTjK4DtZRBxZ2D9aE4y6AbLaPwue2o/criQyQbVD0=
github.com/hyperledger/fabric-protos-go v0.0.0-20210903093419-e9e1b9f969d8 h1:6Qt3MdBqww+aJCDaUDIOhA6EcDhQln5l8wqBXehM96A=
github.com/hyperledger/fabric-protos-go v0.0.0-20210903093419-e9e1b9f969d8/go.mod h1:xVYTjK4DtZRBxZ2D9aE4y6AbLaPwue2o/criQyQbVD0=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
Expand Down
127 changes: 111 additions & 16 deletions integration/gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/gateway"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/integration/nwo"
"github.com/hyperledger/fabric/protoutil"
Expand Down Expand Up @@ -204,6 +205,36 @@ var _ = Describe("GatewayService", func() {
return gatewayClient.CommitStatus(ctx, signedStatusRequest)
}

chaincodeEvents := func(
ctx context.Context,
startPosition *orderer.SeekPosition,
identity func() ([]byte, error),
sign func(msg []byte) ([]byte, error),
) (gateway.Gateway_ChaincodeEventsClient, error) {
identityBytes, err := identity()
Expect(err).NotTo(HaveOccurred())

request := &gateway.ChaincodeEventsRequest{
ChannelId: "testchannel",
ChaincodeId: "gatewaycc",
Identity: identityBytes,
StartPosition: startPosition,
}

requestBytes, err := proto.Marshal(request)
Expect(err).NotTo(HaveOccurred())

signature, err := sign(requestBytes)
Expect(err).NotTo(HaveOccurred())

signedRequest := &gateway.SignedChaincodeEventsRequest{
Request: requestBytes,
Signature: signature,
}

return gatewayClient.ChaincodeEvents(ctx, signedRequest)
}

Describe("Evaluate", func() {
It("should respond with the expected result", func() {
proposedTransaction, transactionID := NewProposedTransaction(signingIdentity, "testchannel", "gatewaycc", "respond", nil, []byte("200"), []byte("conga message"), []byte("conga payload"))
Expand Down Expand Up @@ -244,10 +275,10 @@ var _ = Describe("GatewayService", func() {
Describe("CommitStatus", func() {
It("should respond with status of submitted transaction", func() {
_, transactionID := submitTransaction("respond", []byte("200"), []byte("conga message"), []byte("conga payload"))
status, err := commitStatus(transactionID, signingIdentity.Serialize, signingIdentity.Sign)
statusResult, err := commitStatus(transactionID, signingIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

Expect(status.Result).To(Equal(peer.TxValidationCode_VALID))
Expect(statusResult.Result).To(Equal(peer.TxValidationCode_VALID))
})

It("should respond with block number", func() {
Expand Down Expand Up @@ -287,33 +318,51 @@ var _ = Describe("GatewayService", func() {

Describe("ChaincodeEvents", func() {
It("should respond with emitted chaincode events", func() {
identityBytes, err := signingIdentity.Serialize()
Expect(err).NotTo(HaveOccurred())
eventCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

request := &gateway.ChaincodeEventsRequest{
ChannelId: "testchannel",
ChaincodeId: "gatewaycc",
Identity: identityBytes,
startPosition := &orderer.SeekPosition{
Type: &orderer.SeekPosition_NextCommit{
NextCommit: &orderer.SeekNextCommit{},
},
}

requestBytes, err := proto.Marshal(request)
eventsClient, err := chaincodeEvents(eventCtx, startPosition, signingIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

signature, err := signingIdentity.Sign(requestBytes)
_, transactionID := submitTransaction("event", []byte("EVENT_NAME"), []byte("EVENT_PAYLOAD"))

event, err := eventsClient.Recv()
Expect(err).NotTo(HaveOccurred())

signedRequest := &gateway.SignedChaincodeEventsRequest{
Request: requestBytes,
Signature: signature,
Expect(event.Events).To(HaveLen(1), "number of events")
expectedEvent := &peer.ChaincodeEvent{
ChaincodeId: "gatewaycc",
TxId: transactionID,
EventName: "EVENT_NAME",
Payload: []byte("EVENT_PAYLOAD"),
}
Expect(proto.Equal(event.Events[0], expectedEvent)).To(BeTrue(), "Expected\n\t%#v\nto proto.Equal\n\t%#v", event.Events[0], expectedEvent)
})

It("should respond with replayed chaincode events", func() {
_, transactionID := submitTransaction("event", []byte("EVENT_NAME"), []byte("EVENT_PAYLOAD"))
statusResult, err := commitStatus(transactionID, signingIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

eventCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

eventsClient, err := gatewayClient.ChaincodeEvents(eventCtx, signedRequest)
Expect(err).NotTo(HaveOccurred())
startPosition := &orderer.SeekPosition{
Type: &orderer.SeekPosition_Specified{
Specified: &orderer.SeekSpecified{
Number: statusResult.BlockNumber,
},
},
}

_, transactionID := submitTransaction("event", []byte("EVENT_NAME"), []byte("EVENT_PAYLOAD"))
eventsClient, err := chaincodeEvents(eventCtx, startPosition, signingIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

event, err := eventsClient.Recv()
Expect(err).NotTo(HaveOccurred())
Expand All @@ -327,5 +376,51 @@ var _ = Describe("GatewayService", func() {
}
Expect(proto.Equal(event.Events[0], expectedEvent)).To(BeTrue(), "Expected\n\t%#v\nto proto.Equal\n\t%#v", event.Events[0], expectedEvent)
})

It("should fail on unauthorized identity", func() {
badIdentity := network.OrdererUserSigner(network.Orderer("orderer"), "Admin")

eventCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

startPosition := &orderer.SeekPosition{
Type: &orderer.SeekPosition_NextCommit{
NextCommit: &orderer.SeekNextCommit{},
},
}

eventsClient, err := chaincodeEvents(eventCtx, startPosition, badIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

event, err := eventsClient.Recv()
Expect(err).To(HaveOccurred(), "expected error but got event: %v", event)

grpcErr, _ := status.FromError(err)
Expect(grpcErr.Code()).To(Equal(codes.PermissionDenied))
})

It("should fail on bad signature", func() {
badSign := func(digest []byte) ([]byte, error) {
return signingIdentity.Sign([]byte("WRONG"))
}

eventCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

startPosition := &orderer.SeekPosition{
Type: &orderer.SeekPosition_NextCommit{
NextCommit: &orderer.SeekNextCommit{},
},
}

eventsClient, err := chaincodeEvents(eventCtx, startPosition, signingIdentity.Serialize, badSign)
Expect(err).NotTo(HaveOccurred())

event, err := eventsClient.Recv()
Expect(err).To(HaveOccurred(), "expected error but got event: %v", event)

grpcErr, _ := status.FromError(err)
Expect(grpcErr.Code()).To(Equal(codes.PermissionDenied))
})
})
})
19 changes: 15 additions & 4 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
gp "github.com/hyperledger/fabric-protos-go/gateway"
ab "github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/core/aclmgmt/resources"
"github.com/hyperledger/fabric/internal/pkg/gateway/event"
Expand Down Expand Up @@ -361,12 +362,22 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest
return status.Error(codes.InvalidArgument, err.Error())
}

ledgerInfo, err := ledger.GetBlockchainInfo()
if err != nil {
return status.Error(codes.Unavailable, err.Error())
var startBlock uint64
switch seek := request.StartPosition.Type.(type) {
case *ab.SeekPosition_NextCommit:
ledgerInfo, err := ledger.GetBlockchainInfo()
if err != nil {
return status.Error(codes.Unavailable, err.Error())
}

startBlock = ledgerInfo.GetHeight()
case *ab.SeekPosition_Specified:
startBlock = seek.Specified.GetNumber()
default:
return status.Errorf(codes.InvalidArgument, "invalid start position type: %T", seek)
}

ledgerIter, err := ledger.GetBlocksIterator(ledgerInfo.Height)
ledgerIter, err := ledger.GetBlocksIterator(startBlock)
if err != nil {
return status.Error(codes.Unavailable, err.Error())
}
Expand Down
57 changes: 53 additions & 4 deletions internal/pkg/gateway/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ type testDef struct {
transientData map[string][]byte
interest *peer.ChaincodeInterest
blocks []*cp.Block
startPosition *ab.SeekPosition
}

type preparedTest struct {
Expand Down Expand Up @@ -1176,7 +1177,7 @@ func TestChaincodeEvents(t *testing.T) {
},
},
{
name: "uses block height as default start block",
name: "uses block height as start block if next commit is specified as start position",
blocks: []*cp.Block{
block101Proto,
},
Expand All @@ -1186,11 +1187,51 @@ func TestChaincodeEvents(t *testing.T) {
}
test.ledger.GetBlockchainInfoReturns(ledgerInfo, nil)
},
startPosition: &ab.SeekPosition{
Type: &ab.SeekPosition_NextCommit{
NextCommit: &ab.SeekNextCommit{},
},
},
postTest: func(t *testing.T, test *preparedTest) {
require.Equal(t, 1, test.ledger.GetBlocksIteratorCallCount())
require.EqualValues(t, 101, test.ledger.GetBlocksIteratorArgsForCall(0))
},
},
{
name: "uses specified start block",
blocks: []*cp.Block{
block101Proto,
},
postSetup: func(t *testing.T, test *preparedTest) {
ledgerInfo := &cp.BlockchainInfo{
Height: 101,
}
test.ledger.GetBlockchainInfoReturns(ledgerInfo, nil)
},
startPosition: &ab.SeekPosition{
Type: &ab.SeekPosition_Specified{
Specified: &ab.SeekSpecified{
Number: 99,
},
},
},
postTest: func(t *testing.T, test *preparedTest) {
require.Equal(t, 1, test.ledger.GetBlocksIteratorCallCount())
require.EqualValues(t, 99, test.ledger.GetBlocksIteratorArgsForCall(0))
},
},
{
name: "returns error for unsupported start position type",
blocks: []*cp.Block{
block101Proto,
},
startPosition: &ab.SeekPosition{
Type: &ab.SeekPosition_Oldest{
Oldest: &ab.SeekOldest{},
},
},
errString: "rpc error: code = InvalidArgument desc = invalid start position type: *orderer.SeekPosition_Oldest",
},
{
name: "returns error obtaining ledger iterator",
blocks: []*cp.Block{
Expand Down Expand Up @@ -1241,9 +1282,17 @@ func TestChaincodeEvents(t *testing.T) {
test := prepareTest(t, &tt)

request := &pb.ChaincodeEventsRequest{
ChannelId: testChannel,
Identity: tt.identity,
ChaincodeId: testChaincode,
ChannelId: testChannel,
Identity: tt.identity,
ChaincodeId: testChaincode,
StartPosition: tt.startPosition,
}
if request.StartPosition == nil {
request.StartPosition = &ab.SeekPosition{
Type: &ab.SeekPosition_NextCommit{
NextCommit: &ab.SeekNextCommit{},
},
}
}
requestBytes, err := proto.Marshal(request)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 7747938

Please sign in to comment.