Skip to content

Commit

Permalink
[FABG-748] fix for lost events in CI
Browse files Browse the repository at this point in the history
- events lost due to MVCC_READ_CONFLICT are handled
- added new test to show the behavior of chaincode events
during MVCC_READ_CONFLICTS
- example cc 'move' functions are used only where necessary, instead
new 'set' to be used to avoid MVCC_READ_CONFLICTS


Change-Id: Id38fa45b9922e8e8e1ad45b4415f1d40a28d82fc
Signed-off-by: Sudesh Shetty <sudesh.shetty@securekey.com>
  • Loading branch information
sudeshrshetty committed Aug 28, 2018
1 parent 0628c3f commit 02c1c6f
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 13 deletions.
5 changes: 5 additions & 0 deletions pkg/fab/events/service/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,11 +556,16 @@ func (ed *Dispatcher) publishFilteredBlockEvents(fblock *pb.FilteredBlock, sourc
if txActions == nil {
continue
}
if len(txActions.ChaincodeActions) == 0 {
logger.Debugf("No chaincode action found for TxID[%s], block[%d], source URL[%s]", tx.Txid, fblock.Number, sourceURL)
}
for _, action := range txActions.ChaincodeActions {
if action.ChaincodeEvent != nil {
ed.publishCCEvents(action.ChaincodeEvent, fblock.Number, sourceURL)
}
}
} else {
logger.Debugf("Cannot publish CCEvents for block[%d] and source URL[%s] since Tx Validation Code[%d] is not valid", fblock.Number, sourceURL, tx.TxValidationCode)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,40 @@ func (t *SimpleChaincode) Query(stub shim.ChaincodeStubInterface) pb.Response {
return shim.Error("Unknown supported call")
}

//set sets given key-value in state
func (t *SimpleChaincode) set(stub shim.ChaincodeStubInterface, args []string) pb.Response {
var err error

if len(args) < 3 {
return shim.Error("Incorrect number of arguments. Expecting a key and a value")
}

// Initialize the chaincode
key := args[1]
value := args[2]
eventID := "testEvent"
if len(args) >= 4 {
eventID = args[3]
}

logger.Debugf("Setting value for key[%s]", key)

// Write the state to the ledger
err = stub.PutState(key, []byte(value))
if err != nil {
logger.Errorf("Failed to set value for key[%s] : ", key, err)
return shim.Error(err.Error())
}

err = stub.SetEvent(eventID, []byte("Test Payload"))
if err != nil {
logger.Errorf("Failed to set event for key[%s] : ", key, err)
return shim.Error(err.Error())
}

return shim.Success(nil)
}

// Invoke ...
// Transaction makes payment of X units from A to B
func (t *SimpleChaincode) Invoke(stub shim.ChaincodeStubInterface) pb.Response {
Expand Down Expand Up @@ -138,6 +172,12 @@ func (t *SimpleChaincode) Invoke(stub shim.ChaincodeStubInterface) pb.Response {
// queries an entity state
return t.query(stub, args)
}

if args[0] == "set" {
// setting an entity state
return t.set(stub, args)
}

if args[0] == "move" {
eventID := "testEvent"
if len(args) >= 5 {
Expand Down
5 changes: 5 additions & 0 deletions test/integration/base_test_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func ExampleCCTxArgs() [][]byte {
return txArgs
}

// ExampleCCTxRandomSetArgs returns example cc set args with random key-value pairs
func ExampleCCTxRandomSetArgs() [][]byte {
return [][]byte{[]byte("set"), []byte(GenerateRandomID()), []byte(GenerateRandomID())}
}

//ExampleCCInitArgs returns example cc initialization args
func ExampleCCInitArgs() [][]byte {
return initArgs
Expand Down
10 changes: 5 additions & 5 deletions test/integration/pkg/client/channel/channel_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestCCToCC(t *testing.T) {
channel.Request{
ChaincodeID: cc1ID,
Fcn: "invokecc",
Args: [][]byte{[]byte(cc2ID), []byte(`{"Args":["invoke","move","a","b","1"]}`)},
Args: [][]byte{[]byte(cc2ID), []byte(`{"Args":["invoke","set","x1","y1"]}`)},
},
channel.WithRetry(retry.DefaultChannelOpts),
)
Expand All @@ -223,7 +223,7 @@ func TestCCToCC(t *testing.T) {
channel.Request{
ChaincodeID: cc1ID,
Fcn: "invokecc",
Args: [][]byte{[]byte(cc2ID), []byte(`{"Args":["invoke","move","a","b","1"]}`)},
Args: [][]byte{[]byte(cc2ID), []byte(`{"Args":["invoke","set","x1","y1"]}`)},
InvocationChain: []*fab.ChaincodeCall{
{ID: cc2ID},
},
Expand All @@ -241,7 +241,7 @@ func TestCCToCC(t *testing.T) {
channel.Request{
ChaincodeID: cc1ID,
Fcn: "invokecc",
Args: [][]byte{[]byte(cc2ID), []byte(`{"Args":["invoke","move","a","b","1"]}`)},
Args: [][]byte{[]byte(cc2ID), []byte(`{"Args":["invoke","set","x1","y1"]}`)},
},
channel.WithRetry(retry.DefaultChannelOpts),
)
Expand Down Expand Up @@ -477,7 +477,7 @@ func testChaincodeEventListener(t *testing.T, ccID string, chClient *channel.Cli

func testChaincodeError(t *testing.T, client *channel.Client, ccID string) {
// Try calling unknown function call and expect an error
r, err := client.Execute(channel.Request{ChaincodeID: ccID, Fcn: "DUMMY_FUNCTION", Args: integration.ExampleCCTxArgs()},
r, err := client.Execute(channel.Request{ChaincodeID: ccID, Fcn: "DUMMY_FUNCTION", Args: integration.ExampleCCTxRandomSetArgs()},
channel.WithRetry(retry.DefaultChannelOpts))

t.Logf("testChaincodeError err: %s ***** responses: %v", err, r)
Expand Down Expand Up @@ -545,7 +545,7 @@ func TestNoEndpoints(t *testing.T) {
}

// Test execute transaction: since peer has been disabled for endorsement this transaction should fail
_, err = chClient.Execute(channel.Request{ChaincodeID: mainChaincodeID, Fcn: "invoke", Args: integration.ExampleCCTxArgs()},
_, err = chClient.Execute(channel.Request{ChaincodeID: mainChaincodeID, Fcn: "invoke", Args: integration.ExampleCCTxRandomSetArgs()},
channel.WithRetry(retry.DefaultChannelOpts))
if !strings.Contains(err.Error(), expected1_1Err) && !strings.Contains(err.Error(), expected1_2Err) {
t.Fatal("Should have failed due to no chaincode query peers")
Expand Down
6 changes: 3 additions & 3 deletions test/integration/pkg/client/event/events_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func testCCEvent(ccID string, chClient *channel.Client, eventClient *event.Clien
}
defer eventClient.Unregister(reg)

response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: append(integration.ExampleCCTxArgs(), []byte(eventID))},
response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: append(integration.ExampleCCTxRandomSetArgs(), []byte(eventID))},
channel.WithRetry(retry.DefaultChannelOpts))
if err != nil {
t.Fatalf("Failed to move funds: %s", err)
Expand Down Expand Up @@ -127,7 +127,7 @@ func testRegisterBlockEvent(ccID string, chClient *channel.Client, eventClient *
}
defer eventClient.Unregister(breg)

response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: integration.ExampleCCTxArgs()},
response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: integration.ExampleCCTxRandomSetArgs()},
channel.WithRetry(retry.DefaultChannelOpts))
if err != nil {
t.Fatalf("Failed to move funds: %s", err)
Expand Down Expand Up @@ -155,7 +155,7 @@ func testRegisterFilteredBlockEvent(ccID string, chClient *channel.Client, event
}
defer eventClient.Unregister(fbreg)

response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: integration.ExampleCCTxArgs()},
response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: integration.ExampleCCTxRandomSetArgs()},
channel.WithRetry(retry.DefaultChannelOpts))
if err != nil {
t.Fatalf("Failed to move funds: %s", err)
Expand Down
73 changes: 68 additions & 5 deletions test/integration/pkg/fab/eventclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import (
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek"
"github.com/hyperledger/fabric-sdk-go/test/integration"
pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
)

const eventTimeWindow = 120 * time.Second // the maximum amount of time to watch for events.
const eventTimeWindow = 20 * time.Second // the maximum amount of time to watch for events.

func TestEventClient(t *testing.T) {
chainCodeID := mainChaincodeID
Expand Down Expand Up @@ -95,7 +96,7 @@ func testEventService(t *testing.T, testSetup *integration.BaseSetupImpl, sdk *f
numExpected++
wg.Add(1)

tpResponses, prop, txID := sendTxProposal(sdk, testSetup, t, transactor, chainCodeID)
tpResponses, prop, txID := sendTxProposal(sdk, testSetup, t, transactor, chainCodeID, integration.ExampleCCTxRandomSetArgs())
txReg, txstatusch, err := eventService.RegisterTxStatusEvent(txID)
if err != nil {
t.Fatalf("Error registering for Tx Status event: %s", err)
Expand Down Expand Up @@ -127,14 +128,14 @@ func testEventService(t *testing.T, testSetup *integration.BaseSetupImpl, sdk *f
}
}

func sendTxProposal(sdk *fabsdk.FabricSDK, testSetup *integration.BaseSetupImpl, t *testing.T, transactor fab.Transactor, chainCodeID string) ([]*fab.TransactionProposalResponse, *fab.TransactionProposal, string) {
func sendTxProposal(sdk *fabsdk.FabricSDK, testSetup *integration.BaseSetupImpl, t *testing.T, transactor fab.Transactor, chainCodeID string, args [][]byte) ([]*fab.TransactionProposalResponse, *fab.TransactionProposal, string) {
peers, err := getProposalProcessors(sdk, "Admin", testSetup.OrgID, testSetup.Targets)
require.Nil(t, err, "creating peers failed")
tpResponses, prop, err := createAndSendTransactionProposal(
transactor,
chainCodeID,
"invoke",
[][]byte{[]byte("move"), []byte("a"), []byte("b"), []byte("10")},
args,
peers,
nil,
)
Expand Down Expand Up @@ -162,6 +163,9 @@ func checkTxStatusEvent(wg *sync.WaitGroup, txstatusch <-chan *fab.TxStatusEvent
if txStatus.BlockNumber == 0 {
test.Failf(t, "Expecting non-zero block number")
}
if txStatus.TxValidationCode != pb.TxValidationCode_VALID {
test.Failf(t, "expected transaction validation code to be valid")
}
atomic.AddUint32(numReceived, 1)
case <-time.After(eventTimeWindow):
return
Expand Down Expand Up @@ -328,7 +332,7 @@ func testChannelEventsSeekOptions(t *testing.T, testSetup *integration.BaseSetup
defer eventService.Unregister(ccreg)

// prepare and commit the transaction to generate events
tpResponses, prop, txID := sendTxProposal(sdk, testSetup, t, transactor, chainCodeID)
tpResponses, prop, txID := sendTxProposal(sdk, testSetup, t, transactor, chainCodeID, integration.ExampleCCTxRandomSetArgs())
_, err = createAndSendTransaction(transactor, prop, tpResponses)
require.NoError(t, err, "First invoke failed err")

Expand Down Expand Up @@ -366,3 +370,62 @@ func testChannelEventsSeekOptions(t *testing.T, testSetup *integration.BaseSetup
//to event channel, and first event we get from event channel actually belongs to first transaction after registration.
require.Equal(t, seekType == "", txID == event.TxID, "for seek type[%s], txID [%s], event.txID[%s] ,condition didn't match", seekType, txID, event.TxID)
}

//TestEventClientWithMVCCReadConflicts tests behavior of chaincode events when MVCC_READ_CONFLICT happens
//Chaincode events with Txn Validation Code = MVCC_READ_CONFLICT are not getting published
func TestEventClientWithMVCCReadConflicts(t *testing.T) {
chainCodeID := mainChaincodeID
sdk := mainSDK
testSetup := mainTestSetup

chContextProvider := sdk.ChannelContext(testSetup.ChannelID, fabsdk.WithUser(org1User), fabsdk.WithOrg(org1Name))
chContext, err := chContextProvider()
require.NoError(t, err, "error getting channel context")

eventService, err := chContext.ChannelService().EventService()
require.NoError(t, err, "error getting event service")

testEventServiceWithConflicts(t, testSetup, sdk, chainCodeID, eventService)
}

func testEventServiceWithConflicts(t *testing.T, testSetup *integration.BaseSetupImpl, sdk *fabsdk.FabricSDK, chainCodeID string, eventService fab.EventService) {
_, cancel, transactor, err := getTransactor(sdk, testSetup.ChannelID, "Admin", testSetup.OrgID)
require.NoError(t, err, "Failed to get channel transactor")
defer cancel()

ccreg, cceventch, err := eventService.RegisterChaincodeEvent(chainCodeID, ".*")
require.NoError(t, err, "Error registering for filtered block events")
defer eventService.Unregister(ccreg)

numOfTxns := 4
// Commit multiple transactions to generate events
for i := 0; i < numOfTxns; i++ {
tpResponse, prop, _ := sendTxProposal(sdk, testSetup, t, transactor, chainCodeID, [][]byte{[]byte("move"), []byte("a"), []byte("b"), []byte("5")})
_, err = createAndSendTransaction(transactor, prop, tpResponse)
require.NoError(t, err, "invoke failed")
}

var numReceived int
test:
for {
select {
case event, ok := <-cceventch:
if !ok {
test.Failf(t, "unexpected closed channel while waiting for CC Status event")
}
t.Logf("Received chaincode event: %#v", event)
require.Equal(t, event.ChaincodeID, chainCodeID)
require.NotEmpty(t, event.SourceURL, "Expecting event source URL but got none")
require.NotEmpty(t, event.BlockNumber, "Expecting non-zero block number")
require.NotEmpty(t, event.TxID, "Expecting valid txID")
require.NotEmpty(t, event.EventName, "Expecting valid event name")
numReceived++
continue
case <-time.After(5 * time.Second):
break test
}
}

require.True(t, numReceived < numOfTxns, "Expected number of transactions to be greater than number of events")

}

0 comments on commit 02c1c6f

Please sign in to comment.