Skip to content

Commit

Permalink
Maintain order of transactions in the commit notification
Browse files Browse the repository at this point in the history
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi authored and denyeart committed May 20, 2021
1 parent cca66b6 commit f8070ec
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 45 deletions.
14 changes: 9 additions & 5 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,25 +933,29 @@ func (l *kvLedger) sendCommitNotification(blockNum uint64, txStatsInfo []*valida
close(l.commitNotifier.dataChannel)
l.commitNotifier = nil
default:
txs := map[string]*ledger.CommitNotificationTxInfo{}
txsByID := map[string]struct{}{}
txs := []*ledger.CommitNotificationTxInfo{}
for _, t := range txStatsInfo {
txID := t.TxIDFromChannelHeader
_, ok := txs[txID]
_, ok := txsByID[txID]

if txID == "" || ok {
continue
}
txs[txID] = &ledger.CommitNotificationTxInfo{
txsByID[txID] = struct{}{}

txs = append(txs, &ledger.CommitNotificationTxInfo{
TxType: t.TxType,
TxID: t.TxIDFromChannelHeader,
ValidationCode: t.ValidationCode,
ChaincodeID: t.ChaincodeID,
ChaincodeEventData: t.ChaincodeEventData,
}
})
}

l.commitNotifier.dataChannel <- &ledger.CommitNotification{
BlockNumber: blockNum,
TxsByTxID: txs,
TxsInfo: txs,
}
}
}
Expand Down
22 changes: 13 additions & 9 deletions core/ledger/kvledger/kv_ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,18 +1159,20 @@ func TestCommitNotifications(t *testing.T) {
require.Equal(t,
&ledger.CommitNotification{
BlockNumber: 1,
TxsByTxID: map[string]*ledger.CommitNotificationTxInfo{
"txid_1": {
TxsInfo: []*ledger.CommitNotificationTxInfo{
{
TxID: "txid_1",
TxType: common.HeaderType_ENDORSER_TRANSACTION,
ValidationCode: peer.TxValidationCode_BAD_RWSET,
ChaincodeID: &peer.ChaincodeID{Name: "cc1"},
ChaincodeEventData: []byte("cc1_event"),
TxType: common.HeaderType_ENDORSER_TRANSACTION,
},
"txid_2": {
{
TxID: "txid_2",
TxType: common.HeaderType_ENDORSER_TRANSACTION,
ValidationCode: peer.TxValidationCode_VALID,
ChaincodeID: &peer.ChaincodeID{Name: "cc2"},
ChaincodeEventData: []byte("cc2_event"),
TxType: common.HeaderType_ENDORSER_TRANSACTION,
},
},
},
Expand All @@ -1195,7 +1197,7 @@ func TestCommitNotifications(t *testing.T) {
require.Equal(t,
&ledger.CommitNotification{
BlockNumber: 1,
TxsByTxID: map[string]*ledger.CommitNotificationTxInfo{},
TxsInfo: []*ledger.CommitNotificationTxInfo{},
},
commitNotification,
)
Expand Down Expand Up @@ -1279,15 +1281,17 @@ func TestCommitNotificationsOnBlockCommit(t *testing.T) {
require.Equal(t,
&ledger.CommitNotification{
BlockNumber: 1,
TxsByTxID: map[string]*ledger.CommitNotificationTxInfo{
"txid_1": {
TxsInfo: []*ledger.CommitNotificationTxInfo{
{
TxType: common.HeaderType_ENDORSER_TRANSACTION,
TxID: "txid_1",
ValidationCode: peer.TxValidationCode_VALID,
ChaincodeID: &peer.ChaincodeID{Name: "foo", Version: "v1"},
ChaincodeEventData: []byte("foo-event"),
},
"txid_2": {
{
TxType: common.HeaderType_ENDORSER_TRANSACTION,
TxID: "txid_2",
ValidationCode: peer.TxValidationCode_MVCC_READ_CONFLICT,
ChaincodeID: &peer.ChaincodeID{Name: "bar", Version: "v2"},
ChaincodeEventData: []byte("bar-event"),
Expand Down
7 changes: 5 additions & 2 deletions core/ledger/ledger_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,10 +733,12 @@ type HashProvider interface {
GetHash(opts bccsp.HashOpts) (hash.Hash, error)
}

// CommitNotification is sent on each block commit to the channel returned by PeerLedger.CommitNotificationsChannel()
// CommitNotification is sent on each block commit to the channel returned by PeerLedger.CommitNotificationsChannel().
// TxsInfo field contains the info about individual transactions in the block in the order the transactions appear in the block
// The transactions with a unique and non-empty txID are included in the notification
type CommitNotification struct {
BlockNumber uint64
TxsByTxID map[string]*CommitNotificationTxInfo
TxsInfo []*CommitNotificationTxInfo
}

// CommitNotificationTxInfo contains the details of a transaction that is included in the CommitNotification
Expand All @@ -745,6 +747,7 @@ type CommitNotification struct {
// However, it is guaranteed be non-nil if the transaction is a valid endorser transaction.
type CommitNotificationTxInfo struct {
TxType common.HeaderType
TxID string
ValidationCode peer.TxValidationCode
ChaincodeID *peer.ChaincodeID
ChaincodeEventData []byte
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/gateway/commit/channelnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func (notifier *channelLevelNotifier) run() {
}

func (notifier *channelLevelNotifier) receiveBlock(blockCommit *ledger.CommitNotification) {
for transactionID, txInfo := range blockCommit.TxsByTxID {
for _, txInfo := range blockCommit.TxsInfo {
n := &notification{
BlockNumber: blockCommit.BlockNumber,
TransactionID: transactionID,
TransactionID: txInfo.TxID,
ValidationCode: txInfo.ValidationCode,
}
notifier.notify(n)
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/gateway/commit/finder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ func TestFinder(t *testing.T) {
commitSend := make(chan *ledger.CommitNotification)
msg := &ledger.CommitNotification{
BlockNumber: 1,
TxsByTxID: map[string]*ledger.CommitNotificationTxInfo{
"TX_ID": {
TxsInfo: []*ledger.CommitNotificationTxInfo{
{
TxID: "TX_ID",
ValidationCode: peer.TxValidationCode_MVCC_READ_CONFLICT,
},
},
Expand Down
63 changes: 38 additions & 25 deletions internal/pkg/gateway/commit/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func TestNotifier(t *testing.T) {

commitSend <- &ledger.CommitNotification{
BlockNumber: 1,
TxsByTxID: map[string]*ledger.CommitNotificationTxInfo{
"TX_ID": {
TxsInfo: []*ledger.CommitNotificationTxInfo{
{
TxID: "TX_ID",
ValidationCode: peer.TxValidationCode_MVCC_READ_CONFLICT,
},
},
Expand All @@ -82,11 +83,13 @@ func TestNotifier(t *testing.T) {

commitSend <- &ledger.CommitNotification{
BlockNumber: 1,
TxsByTxID: map[string]*ledger.CommitNotificationTxInfo{
"WRONG_TX_ID": {
TxsInfo: []*ledger.CommitNotificationTxInfo{
{
TxID: "WRONG_TX_ID",
ValidationCode: peer.TxValidationCode_VALID,
},
"TX_ID": {
{
TxID: "TX_ID",
ValidationCode: peer.TxValidationCode_MVCC_READ_CONFLICT,
},
},
Expand All @@ -111,16 +114,18 @@ func TestNotifier(t *testing.T) {

commitSend <- &ledger.CommitNotification{
BlockNumber: 1,
TxsByTxID: map[string]*ledger.CommitNotificationTxInfo{
"WRONG_TX_ID": {
TxsInfo: []*ledger.CommitNotificationTxInfo{
{
TxID: "WRONG_TX_ID",
ValidationCode: peer.TxValidationCode_VALID,
},
},
}
commitSend <- &ledger.CommitNotification{
BlockNumber: 2,
TxsByTxID: map[string]*ledger.CommitNotificationTxInfo{
"TX_ID": {
TxsInfo: []*ledger.CommitNotificationTxInfo{
{
TxID: "TX_ID",
ValidationCode: peer.TxValidationCode_MVCC_READ_CONFLICT,
},
},
Expand All @@ -145,16 +150,18 @@ func TestNotifier(t *testing.T) {

commitSend <- &ledger.CommitNotification{
BlockNumber: 1,
TxsByTxID: map[string]*ledger.CommitNotificationTxInfo{
"TX_ID": {
TxsInfo: []*ledger.CommitNotificationTxInfo{
{
TxID: "TX_ID",
ValidationCode: peer.TxValidationCode_MVCC_READ_CONFLICT,
},
},
}
commitSend <- &ledger.CommitNotification{
BlockNumber: 2,
TxsByTxID: map[string]*ledger.CommitNotificationTxInfo{
"TX_ID": {
TxsInfo: []*ledger.CommitNotificationTxInfo{
{
TxID: "TX_ID",
ValidationCode: peer.TxValidationCode_MVCC_READ_CONFLICT,
},
},
Expand All @@ -179,16 +186,18 @@ func TestNotifier(t *testing.T) {

commitSend <- &ledger.CommitNotification{
BlockNumber: 1,
TxsByTxID: map[string]*ledger.CommitNotificationTxInfo{
"TX_ID": {
TxsInfo: []*ledger.CommitNotificationTxInfo{
{
TxID: "TX_ID",
ValidationCode: peer.TxValidationCode_MVCC_READ_CONFLICT,
},
},
}
commitSend <- &ledger.CommitNotification{
BlockNumber: 2,
TxsByTxID: map[string]*ledger.CommitNotificationTxInfo{
"TX_ID": {
TxsInfo: []*ledger.CommitNotificationTxInfo{
{
TxID: "TX_ID",
ValidationCode: peer.TxValidationCode_VALID,
},
},
Expand All @@ -211,8 +220,9 @@ func TestNotifier(t *testing.T) {
close(done)
commitSend <- &ledger.CommitNotification{
BlockNumber: 1,
TxsByTxID: map[string]*ledger.CommitNotificationTxInfo{
"TX_ID": {
TxsInfo: []*ledger.CommitNotificationTxInfo{
{
TxID: "TX_ID",
ValidationCode: peer.TxValidationCode_MVCC_READ_CONFLICT,
},
},
Expand All @@ -235,8 +245,9 @@ func TestNotifier(t *testing.T) {

commitSend <- &ledger.CommitNotification{
BlockNumber: 1,
TxsByTxID: map[string]*ledger.CommitNotificationTxInfo{
"TX_ID": {
TxsInfo: []*ledger.CommitNotificationTxInfo{
{
TxID: "TX_ID",
ValidationCode: peer.TxValidationCode_MVCC_READ_CONFLICT,
},
},
Expand Down Expand Up @@ -268,8 +279,9 @@ func TestNotifier(t *testing.T) {
close(done)
commitSend <- &ledger.CommitNotification{
BlockNumber: 1,
TxsByTxID: map[string]*ledger.CommitNotificationTxInfo{
"TX_ID": {
TxsInfo: []*ledger.CommitNotificationTxInfo{
{
TxID: "TX_ID",
ValidationCode: peer.TxValidationCode_MVCC_READ_CONFLICT,
},
},
Expand Down Expand Up @@ -348,8 +360,9 @@ func TestNotifier(t *testing.T) {

commitSend2 <- &ledger.CommitNotification{
BlockNumber: 1,
TxsByTxID: map[string]*ledger.CommitNotificationTxInfo{
"TX_ID": {
TxsInfo: []*ledger.CommitNotificationTxInfo{
{
TxID: "TX_ID",
ValidationCode: peer.TxValidationCode_MVCC_READ_CONFLICT,
},
},
Expand Down

0 comments on commit f8070ec

Please sign in to comment.