Skip to content

Commit

Permalink
Locate correct block number for transaction ID in ChaincodeEvents (#3289
Browse files Browse the repository at this point in the history
)

If the request contains an `after_transaction_id`, start reading from the block containing that transaction ID and ignore any specified start block. If the specified transaction has not been committed in a block, fall back to any specified start block or next committed block.

Signed-off-by: Mark S. Lewis <mark_lewis@uk.ibm.com>
  • Loading branch information
bestbeforetoday authored and denyeart committed Jun 3, 2022
1 parent f64eea2 commit 8ffd334
Show file tree
Hide file tree
Showing 16 changed files with 680 additions and 778 deletions.
15 changes: 13 additions & 2 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
ab "github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/core/aclmgmt/resources"
"github.com/hyperledger/fabric/core/chaincode"
"github.com/hyperledger/fabric/internal/pkg/gateway/event"
"github.com/hyperledger/fabric/internal/pkg/gateway/ledger"
"github.com/hyperledger/fabric/protoutil"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -543,7 +543,7 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest
return status.Error(codes.NotFound, err.Error())
}

startBlock, err := startBlockFromLedgerPosition(ledger, request.GetStartPosition())
startBlock, err := chaincodeEventsStartBlock(ledger, request)
if err != nil {
return err
}
Expand Down Expand Up @@ -611,6 +611,17 @@ func chaincodeEventMatcher(request *gp.ChaincodeEventsRequest) func(event *peer.
}
}

func chaincodeEventsStartBlock(ledger ledger.Ledger, request *gp.ChaincodeEventsRequest) (uint64, error) {
afterTransactionID := request.GetAfterTransactionId()
if len(afterTransactionID) > 0 {
if block, err := ledger.GetBlockByTxID(afterTransactionID); err == nil {
return block.GetHeader().GetNumber(), nil
}
}

return startBlockFromLedgerPosition(ledger, request.GetStartPosition())
}

func startBlockFromLedgerPosition(ledger ledger.Ledger, position *ab.SeekPosition) (uint64, error) {
switch seek := position.GetType().(type) {
case nil:
Expand Down
92 changes: 71 additions & 21 deletions internal/pkg/gateway/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import (
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/flogging/mock"
commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/common"
gdiscovery "github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/hyperledger/fabric/internal/pkg/gateway/commit"
"github.com/hyperledger/fabric/internal/pkg/gateway/config"
ledgermocks "github.com/hyperledger/fabric/internal/pkg/gateway/ledger/mocks"
"github.com/hyperledger/fabric/internal/pkg/gateway/mocks"
idmocks "github.com/hyperledger/fabric/internal/pkg/identity/mocks"
"github.com/hyperledger/fabric/protoutil"
Expand Down Expand Up @@ -78,19 +79,9 @@ type aclChecker interface {
ACLChecker
}

//go:generate counterfeiter -o mocks/ledgerprovider.go --fake-name LedgerProvider . ledgerProvider
type ledgerProvider interface {
LedgerProvider
}

//go:generate counterfeiter -o mocks/ledger.go --fake-name Ledger . mockLedger
type mockLedger interface {
commonledger.Ledger
}

//go:generate counterfeiter -o mocks/resultsiterator.go --fake-name ResultsIterator . resultsIterator
//go:generate counterfeiter -o mocks/resultsiterator.go --fake-name ResultsIterator . mockResultsIterator
type mockResultsIterator interface {
commonledger.ResultsIterator
ledger.ResultsIterator
}

type (
Expand Down Expand Up @@ -169,8 +160,8 @@ type preparedTest struct {
finder *mocks.CommitFinder
eventsServer *mocks.ChaincodeEventsServer
policy *mocks.ACLChecker
ledgerProvider *mocks.LedgerProvider
ledger *mocks.Ledger
ledgerProvider *ledgermocks.Provider
ledger *ledgermocks.Ledger
blockIterator *mocks.ResultsIterator
logLevel string
logFields []string
Expand Down Expand Up @@ -1652,7 +1643,7 @@ func TestChaincodeEvents(t *testing.T) {
},
},
{
name: "identifies previous transaction ID if from different chaincode",
name: "identifies specified transaction if from different chaincode",
blocks: []*cp.Block{
differentChaincodePartReadBlock,
},
Expand All @@ -1672,7 +1663,7 @@ func TestChaincodeEvents(t *testing.T) {
},
},
{
name: "identifies previous transaction ID if not in start block",
name: "identifies specified transaction if not in first read block",
blocks: []*cp.Block{
noMatchingEventsBlock,
partReadBlock,
Expand Down Expand Up @@ -1781,6 +1772,65 @@ func TestChaincodeEvents(t *testing.T) {
require.EqualValues(t, 101, test.ledger.GetBlocksIteratorArgsForCall(0))
},
},
{
name: "uses block containing specified transaction instead of start block",
blocks: []*cp.Block{
matchingEventBlock,
},
postSetup: func(t *testing.T, test *preparedTest) {
ledgerInfo := &cp.BlockchainInfo{
Height: 101,
}
test.ledger.GetBlockchainInfoReturns(ledgerInfo, nil)

block := &cp.Block{
Header: &cp.BlockHeader{
Number: 99,
},
}
test.ledger.GetBlockByTxIDReturns(block, nil)
},
afterTxID: "TX_ID",
startPosition: &ab.SeekPosition{
Type: &ab.SeekPosition_Specified{
Specified: &ab.SeekSpecified{
Number: 1,
},
},
},
postTest: func(t *testing.T, test *preparedTest) {
require.Equal(t, 1, test.ledger.GetBlocksIteratorCallCount())
require.EqualValues(t, 99, test.ledger.GetBlocksIteratorArgsForCall(0))
require.Equal(t, 1, test.ledger.GetBlockByTxIDCallCount())
require.Equal(t, "TX_ID", test.ledger.GetBlockByTxIDArgsForCall(0))
},
},
{
name: "uses start block if specified transaction not found",
blocks: []*cp.Block{
matchingEventBlock,
},
postSetup: func(t *testing.T, test *preparedTest) {
ledgerInfo := &cp.BlockchainInfo{
Height: 101,
}
test.ledger.GetBlockchainInfoReturns(ledgerInfo, nil)

test.ledger.GetBlockByTxIDReturns(nil, errors.New("NOT_FOUND"))
},
afterTxID: "TX_ID",
startPosition: &ab.SeekPosition{
Type: &ab.SeekPosition_Specified{
Specified: &ab.SeekSpecified{
Number: 1,
},
},
},
postTest: func(t *testing.T, test *preparedTest) {
require.Equal(t, 1, test.ledger.GetBlocksIteratorCallCount())
require.EqualValues(t, 1, test.ledger.GetBlocksIteratorArgsForCall(0))
},
},
{
name: "returns error for unsupported start position type",
blocks: []*cp.Block{
Expand Down Expand Up @@ -1899,7 +1949,7 @@ func TestNilArgs(t *testing.T) {
&mocks.Discovery{},
&mocks.CommitFinder{},
&mocks.ACLChecker{},
&mocks.LedgerProvider{},
&ledgermocks.Provider{},
gdiscovery.NetworkMember{
PKIid: common.PKIidType("id1"),
Endpoint: "localhost:7051",
Expand Down Expand Up @@ -1978,7 +2028,7 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest {
blockChannel <- block
}
close(blockChannel)
mockBlockIterator.NextCalls(func() (commonledger.QueryResult, error) {
mockBlockIterator.NextCalls(func() (ledger.QueryResult, error) {
if tt.eventErr != nil {
return nil, tt.eventErr
}
Expand All @@ -1991,14 +2041,14 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest {
return block, nil
})

mockLedger := &mocks.Ledger{}
mockLedger := &ledgermocks.Ledger{}
ledgerInfo := &cp.BlockchainInfo{
Height: 1,
}
mockLedger.GetBlockchainInfoReturns(ledgerInfo, nil)
mockLedger.GetBlocksIteratorReturns(mockBlockIterator, nil)

mockLedgerProvider := &mocks.LedgerProvider{}
mockLedgerProvider := &ledgermocks.Provider{}
mockLedgerProvider.LedgerReturns(mockLedger, nil)

validProposal := createProposal(t, testChannel, testChaincode, tt.transientData)
Expand Down
20 changes: 10 additions & 10 deletions internal/pkg/gateway/commit/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"

"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/internal/pkg/gateway/ledger"

"github.com/pkg/errors"
)
Expand All @@ -22,21 +23,15 @@ type Status struct {
Code peer.TxValidationCode
}

// QueryProvider provides status of previously committed transactions on a given channel. An error is returned if the
// transaction is not present in the ledger.
type QueryProvider interface {
TransactionStatus(channelName string, transactionID string) (peer.TxValidationCode, uint64, error)
}

// Finder is used to obtain transaction status.
type Finder struct {
query QueryProvider
provider ledger.Provider
notifier *Notifier
}

func NewFinder(query QueryProvider, notifier *Notifier) *Finder {
func NewFinder(provider ledger.Provider, notifier *Notifier) *Finder {
return &Finder{
query: query,
provider: provider,
notifier: notifier,
}
}
Expand All @@ -53,7 +48,12 @@ func (finder *Finder) TransactionStatus(ctx context.Context, channelName string,
return nil, err
}

if code, blockNumber, err := finder.query.TransactionStatus(channelName, transactionID); err == nil {
ledger, err := finder.provider.Ledger(channelName)
if err != nil {
return nil, err
}

if code, blockNumber, err := ledger.GetTxValidationCodeByTxID(transactionID); err == nil {
status := &Status{
BlockNumber: blockNumber,
TransactionID: transactionID,
Expand Down
67 changes: 23 additions & 44 deletions internal/pkg/gateway/commit/finder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,11 @@ import (

"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/internal/pkg/gateway/commit/mocks"
"github.com/hyperledger/fabric/internal/pkg/gateway/ledger/mocks"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)

//go:generate counterfeiter -o mocks/queryprovider.go --fake-name QueryProvider . queryProvider
type queryProvider interface { // Mimic QueryProvider to avoid circular import with generated mock
QueryProvider
}

func TestFinder(t *testing.T) {
sendUntilDone := func(commitSend chan<- *ledger.CommitNotification, msg *ledger.CommitNotification) chan struct{} {
done := make(chan struct{})
Expand All @@ -40,35 +35,39 @@ func TestFinder(t *testing.T) {
return done
}

t.Run("passes channel name to query provider", func(t *testing.T) {
provider := &mocks.QueryProvider{}
provider.TransactionStatusReturns(peer.TxValidationCode_MVCC_READ_CONFLICT, 101, nil)
newMocks := func(code peer.TxValidationCode, blockNumber uint64, err error) (*mocks.Provider, *mocks.Ledger) {
provider, ledger := newLedgerMocks()
ledger.GetTxValidationCodeByTxIDReturns(code, blockNumber, err)

return provider, ledger
}

t.Run("passes channel name to provider", func(t *testing.T) {
provider, _ := newMocks(peer.TxValidationCode_MVCC_READ_CONFLICT, 101, nil)
finder := NewFinder(provider, newTestNotifier(nil))

finder.TransactionStatus(context.Background(), "CHANNEL", "TX_ID")

require.Equal(t, 1, provider.TransactionStatusCallCount())
require.Equal(t, 1, provider.LedgerCallCount())

actual, _ := provider.TransactionStatusArgsForCall(0)
actual := provider.LedgerArgsForCall(0)
require.Equal(t, "CHANNEL", actual)
})

t.Run("passes transaction ID to query provider", func(t *testing.T) {
provider := &mocks.QueryProvider{}
provider.TransactionStatusReturns(peer.TxValidationCode_MVCC_READ_CONFLICT, 101, nil)
t.Run("passes transaction ID to ledger", func(t *testing.T) {
provider, ledger := newMocks(peer.TxValidationCode_MVCC_READ_CONFLICT, 101, nil)
finder := NewFinder(provider, newTestNotifier(nil))

finder.TransactionStatus(context.Background(), "CHANNEL", "TX_ID")

require.Equal(t, 1, provider.TransactionStatusCallCount())
require.Equal(t, 1, ledger.GetTxValidationCodeByTxIDCallCount())

_, actual := provider.TransactionStatusArgsForCall(0)
actual := ledger.GetTxValidationCodeByTxIDArgsForCall(0)
require.Equal(t, "TX_ID", actual)
})

t.Run("returns previously committed transaction status", func(t *testing.T) {
provider := &mocks.QueryProvider{}
provider.TransactionStatusReturns(peer.TxValidationCode_MVCC_READ_CONFLICT, 101, nil)
provider, _ := newMocks(peer.TxValidationCode_MVCC_READ_CONFLICT, 101, nil)
finder := NewFinder(provider, newTestNotifier(nil))

actual, err := finder.TransactionStatus(context.Background(), "CHANNEL", "TX_ID")
Expand All @@ -83,8 +82,7 @@ func TestFinder(t *testing.T) {
})

t.Run("returns notified transaction status when no previous commit", func(t *testing.T) {
provider := &mocks.QueryProvider{}
provider.TransactionStatusReturns(0, 0, errors.New("NOT_FOUND"))
provider, _ := newMocks(peer.TxValidationCode_MVCC_READ_CONFLICT, 101, nil)
commitSend := make(chan *ledger.CommitNotification)
finder := NewFinder(provider, newTestNotifier(commitSend))

Expand All @@ -111,34 +109,16 @@ func TestFinder(t *testing.T) {
})

t.Run("returns error from notifier", func(t *testing.T) {
provider := &mocks.QueryProvider{}
provider.TransactionStatusReturns(0, 0, errors.New("NOT_FOUND"))
supplier := &mocks.NotificationSupplier{}
supplier.CommitNotificationsReturns(nil, errors.New("MY_ERROR"))
finder := NewFinder(provider, NewNotifier(supplier))
provider, ledger := newMocks(0, 0, errors.New("NOT_FOUND"))
ledger.CommitNotificationsChannelReturns(nil, errors.New("MY_ERROR"))
finder := NewFinder(provider, NewNotifier(provider))

_, err := finder.TransactionStatus(context.Background(), "CHANNEL", "TX_ID")
require.ErrorContains(t, err, "MY_ERROR")
})

t.Run("passes channel name to supplier", func(t *testing.T) {
provider := &mocks.QueryProvider{}
provider.TransactionStatusReturns(0, 0, errors.New("NOT_FOUND"))
supplier := &mocks.NotificationSupplier{}
supplier.CommitNotificationsReturns(nil, errors.New("MY_ERROR"))
finder := NewFinder(provider, NewNotifier(supplier))

finder.TransactionStatus(context.Background(), "CHANNEL", "TX_ID")

require.Equal(t, 1, supplier.CommitNotificationsCallCount())

_, actual := supplier.CommitNotificationsArgsForCall(0)
require.Equal(t, "CHANNEL", actual)
})

t.Run("returns context error when context cancelled", func(t *testing.T) {
provider := &mocks.QueryProvider{}
provider.TransactionStatusReturns(0, 0, errors.New("NOT_FOUND"))
provider, _ := newMocks(0, 0, errors.New("NOT_FOUND"))
finder := NewFinder(provider, newTestNotifier(nil))

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -149,8 +129,7 @@ func TestFinder(t *testing.T) {
})

t.Run("returns error when notification supplier fails", func(t *testing.T) {
provider := &mocks.QueryProvider{}
provider.TransactionStatusReturns(0, 0, errors.New("NOT_FOUND"))
provider, _ := newMocks(0, 0, errors.New("NOT_FOUND"))
commitSend := make(chan *ledger.CommitNotification)
close(commitSend)
finder := NewFinder(provider, newTestNotifier(commitSend))
Expand Down
Loading

0 comments on commit 8ffd334

Please sign in to comment.