diff --git a/fabric-client/events/eventhub.go b/fabric-client/events/eventhub.go index cbffbbca06..901d090d17 100644 --- a/fabric-client/events/eventhub.go +++ b/fabric-client/events/eventhub.go @@ -49,7 +49,7 @@ type EventHub interface { Disconnect() RegisterChaincodeEvent(ccid string, eventname string, callback func(*ChaincodeEvent)) *ChainCodeCBE UnregisterChaincodeEvent(cbe *ChainCodeCBE) - RegisterTxEvent(txID string, callback func(string, error)) + RegisterTxEvent(txID string, callback func(string, pb.TxValidationCode, error)) UnregisterTxEvent(txID string) RegisterBlockEvent(callback func(*common.Block)) UnregisterBlockEvent(callback func(*common.Block)) @@ -74,7 +74,7 @@ type eventHub struct { // Map of clients registered for block events blockRegistrants []func(*common.Block) // Map of clients registered for transactional events - txRegistrants map[string]func(string, error) + txRegistrants map[string]func(string, pb.TxValidationCode, error) // peer addr to connect to peerAddr string // peer tls certificate @@ -125,7 +125,7 @@ func (ccf *consumerClientFactory) newEventsClient(peerAddress string, certificat func NewEventHub() EventHub { chaincodeRegistrants := make(map[string][]*ChainCodeCBE) blockRegistrants := make([]func(*common.Block), 0) - txRegistrants := make(map[string]func(string, error)) + txRegistrants := make(map[string]func(string, pb.TxValidationCode, error)) eventHub := &eventHub{ chaincodeRegistrants: chaincodeRegistrants, @@ -395,7 +395,7 @@ func (eventHub *eventHub) UnregisterChaincodeEvent(cbe *ChainCodeCBE) { * @param {function} callback Function that takes a single parameter which * is a json object representation of type "message Transaction" */ -func (eventHub *eventHub) RegisterTxEvent(txID string, callback func(string, error)) { +func (eventHub *eventHub) RegisterTxEvent(txID string, callback func(string, pb.TxValidationCode, error)) { logger.Debugf("reg txid %s\n", txID) eventHub.mtx.Lock() @@ -424,6 +424,7 @@ func (eventHub *eventHub) txCallback(block *common.Block) { txFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) for i, v := range block.Data.Data { + if env, err := utils.GetEnvelopeFromBlock(v); err != nil { logger.Errorf("error extracting Envelope from block: %v\n", err) return @@ -446,10 +447,9 @@ func (eventHub *eventHub) txCallback(block *common.Block) { callback := eventHub.getTXRegistrant(channelHeader.TxId) if callback != nil { if txFilter.IsInvalid(i) { - callback(channelHeader.TxId, fmt.Errorf("Received invalid transaction from channel %s\n", channelHeader.ChannelId)) - + callback(channelHeader.TxId, txFilter.Flag(i), fmt.Errorf("Received invalid transaction from channel %s", channelHeader.ChannelId)) } else { - callback(channelHeader.TxId, nil) + callback(channelHeader.TxId, txFilter.Flag(i), nil) } } else { logger.Debugf("No callback registered for TxID: %s\n", channelHeader.TxId) @@ -487,7 +487,7 @@ func (eventHub *eventHub) getChaincodeRegistrants(chaincodeID string) []*ChainCo return clone } -func (eventHub *eventHub) getTXRegistrant(txID string) func(string, error) { +func (eventHub *eventHub) getTXRegistrant(txID string) func(string, pb.TxValidationCode, error) { eventHub.mtx.RLock() defer eventHub.mtx.RUnlock() return eventHub.txRegistrants[txID] diff --git a/fabric-client/events/eventhub_test.go b/fabric-client/events/eventhub_test.go index 19266c7723..c68388af5a 100644 --- a/fabric-client/events/eventhub_test.go +++ b/fabric-client/events/eventhub_test.go @@ -62,7 +62,7 @@ func TestDeadlock(t *testing.T) { go flood(eventsPerThread, threads, func() { transactionID := generateTxID() received := newCompletionHandler(timeout) - eventHub.RegisterTxEvent(transactionID, func(txID string, err error) { + eventHub.RegisterTxEvent(transactionID, func(txID string, code pb.TxValidationCode, err error) { txCompletion.done() received.done() }) diff --git a/fabric-client/helpers/events.go b/fabric-client/helpers/events.go index 935f994a4e..77bca1a6c5 100644 --- a/fabric-client/helpers/events.go +++ b/fabric-client/helpers/events.go @@ -19,7 +19,10 @@ limitations under the License. package helpers -import "github.com/hyperledger/fabric-sdk-go/fabric-client/events" +import ( + "github.com/hyperledger/fabric-sdk-go/fabric-client/events" + pb "github.com/hyperledger/fabric/protos/peer" +) // RegisterTxEvent registers on the given eventhub for the give transaction // returns a boolean channel which receives true when the event is complete @@ -28,7 +31,7 @@ func RegisterTxEvent(txID string, eventHub events.EventHub) (chan bool, chan err done := make(chan bool) fail := make(chan error) - eventHub.RegisterTxEvent(txID, func(txId string, err error) { + eventHub.RegisterTxEvent(txID, func(txId string, errorCode pb.TxValidationCode, err error) { if err != nil { logger.Debugf("Received error event for txid(%s)\n", txId) fail <- err diff --git a/test/integration/events_test.go b/test/integration/events_test.go index 293cfb44c0..3777a66337 100644 --- a/test/integration/events_test.go +++ b/test/integration/events_test.go @@ -27,6 +27,7 @@ import ( fabricClient "github.com/hyperledger/fabric-sdk-go/fabric-client" fcUtil "github.com/hyperledger/fabric-sdk-go/fabric-client/helpers" "github.com/hyperledger/fabric/protos/common" + pb "github.com/hyperledger/fabric/protos/peer" ) func TestEvents(t *testing.T) { @@ -55,6 +56,7 @@ func TestEvents(t *testing.T) { testFailedTx(t, testSetup) + testFailedTxErrorCode(t, testSetup) // Test disconnect event hub testSetup.EventHub.Disconnect() if testSetup.EventHub.IsConnected() { @@ -109,6 +111,7 @@ func testFailedTx(t *testing.T, testSetup BaseSetupImpl) { select { case <-done1: case <-fail1: + t.Fatalf("Received fail for second invoke") case <-done2: t.Fatalf("Received success for second invoke") case <-fail2: @@ -121,6 +124,84 @@ func testFailedTx(t *testing.T, testSetup BaseSetupImpl) { } +func testFailedTxErrorCode(t *testing.T, testSetup BaseSetupImpl) { + + // Arguments for events CC + var args []string + args = append(args, "invoke") + args = append(args, "invoke") + args = append(args, "SEVERE") + + tpResponses1, tx1, err := fcUtil.CreateAndSendTransactionProposal(testSetup.Chain, testSetup.ChainCodeID, testSetup.ChainID, args, []fabricClient.Peer{testSetup.Chain.GetPrimaryPeer()}, nil) + if err != nil { + t.Fatalf("CreateAndSendTransactionProposal return error: %v \n", err) + } + + tpResponses2, tx2, err := fcUtil.CreateAndSendTransactionProposal(testSetup.Chain, testSetup.ChainCodeID, testSetup.ChainID, args, []fabricClient.Peer{testSetup.Chain.GetPrimaryPeer()}, nil) + if err != nil { + t.Fatalf("CreateAndSendTransactionProposal return error: %v \n", err) + } + + done := make(chan bool) + fail := make(chan error) + var errorValidationCode pb.TxValidationCode + testSetup.EventHub.RegisterTxEvent(tx1, func(txId string, errorCode pb.TxValidationCode, err error) { + if err != nil { + errorValidationCode = errorCode + fail <- err + } else { + done <- true + } + }) + + defer testSetup.EventHub.UnregisterTxEvent(tx1) + + done2 := make(chan bool) + fail2 := make(chan error) + + testSetup.EventHub.RegisterTxEvent(tx2, func(txId string, errorCode pb.TxValidationCode, err error) { + if err != nil { + errorValidationCode = errorCode + fail2 <- err + } else { + done2 <- true + } + }) + + defer testSetup.EventHub.UnregisterTxEvent(tx2) + + // Test invalid transaction: create 2 invoke requests in quick succession that modify + // the same state variable which should cause one invoke to be invalid + _, err = fcUtil.CreateAndSendTransaction(testSetup.Chain, tpResponses1) + if err != nil { + t.Fatalf("First invoke failed err: %v", err) + } + _, err = fcUtil.CreateAndSendTransaction(testSetup.Chain, tpResponses2) + if err != nil { + t.Fatalf("Second invoke failed err: %v", err) + } + + for i := 0; i < 2; i++ { + select { + case <-done: + case <-fail: + t.Fatalf("Received fail for second invoke") + case <-done2: + t.Fatalf("Received success for second invoke") + case <-fail2: + // success + fmt.Println("Received error validation Code ", errorValidationCode) + if errorValidationCode.String() != "MVCC_READ_CONFLICT" { + t.Fatalf("Expected error code MVCC_READ_CONFLICT") + } + return + case <-time.After(time.Second * 30): + t.Fatalf("invoke Didn't receive block event for txid1(%s) or txid1(%s)", tx1, tx2) + } + } + +} + func testMultipleBlockEventCallbacks(t *testing.T, testSetup BaseSetupImpl) { // Arguments for events CC