Skip to content

Commit

Permalink
fix missing err check in the block commit path (#1543)
Browse files Browse the repository at this point in the history
Signed-off-by: senthil <cendhu@gmail.com>
  • Loading branch information
cendhu authored and mastersingh24 committed Sep 9, 2020
1 parent cc6dc99 commit 09764d8
Show file tree
Hide file tree
Showing 14 changed files with 133 additions and 54 deletions.
34 changes: 32 additions & 2 deletions core/ledger/kvledger/hashcheck_pvtdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,45 @@ import (
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/ledger/mock"
"github.com/hyperledger/fabric/protoutil"
"github.com/stretchr/testify/require"
)

func TestConstructValidInvalidBlocksPvtData(t *testing.T) {
conf, cleanup := testConfig(t)
defer cleanup()
provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{})
nsCollBtlConfs := []*nsCollBtlConfig{
{
namespace: "ns-1",
btlConfig: map[string]uint64{
"coll-1": 0,
"coll-2": 0,
},
},
{
namespace: "ns-2",
btlConfig: map[string]uint64{
"coll-2": 0,
},
},
{
namespace: "ns-4",
btlConfig: map[string]uint64{
"coll-2": 0,
},
},
{
namespace: "ns-6",
btlConfig: map[string]uint64{
"coll-2": 0,
},
},
}
provider := testutilNewProviderWithCollectionConfig(
t,
nsCollBtlConfs,
conf,
)
defer provider.Close()

_, gb := testutil.NewBlockGenerator(t, "testLedger", false)
Expand Down
4 changes: 1 addition & 3 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,7 @@ func (l *kvLedger) syncStateDBWithOldBlkPvtdata() error {
return err
}

l.pvtdataStore.ResetLastUpdatedOldBlocksList()

return nil
return l.pvtdataStore.ResetLastUpdatedOldBlocksList()
}

func (l *kvLedger) filterYetToCommitBlocks(blocksPvtData map[uint64][]*ledger.TxPvtData) error {
Expand Down
53 changes: 35 additions & 18 deletions core/ledger/kvledger/kv_ledger_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,43 +623,60 @@ func testutilNewProvider(conf *lgr.Config, t *testing.T, ccInfoProvider *mock.De
return provider
}

type nsCollBtlConfig struct {
namespace string
btlConfig map[string]uint64
}

func testutilNewProviderWithCollectionConfig(
t *testing.T,
namespace string,
btlConfigs map[string]uint64,
nsCollBtlConfigs []*nsCollBtlConfig,
conf *lgr.Config,
) *Provider {
provider := testutilNewProvider(conf, t, &mock.DeployedChaincodeInfoProvider{})
mockCCInfoProvider := provider.initializer.DeployedChaincodeInfoProvider.(*mock.DeployedChaincodeInfoProvider)
collMap := map[string]*peer.StaticCollectionConfig{}
var collConf []*peer.CollectionConfig
for collName, btl := range btlConfigs {
staticConf := &peer.StaticCollectionConfig{Name: collName, BlockToLive: btl}
collMap[collName] = staticConf
collectionConf := &peer.CollectionConfig{}
collectionConf.Payload = &peer.CollectionConfig_StaticCollectionConfig{StaticCollectionConfig: staticConf}
collConf = append(collConf, collectionConf)
collectionConfPkgs := []*peer.CollectionConfigPackage{}

nsCollMap := map[string]map[string]*peer.StaticCollectionConfig{}
for _, nsCollBtlConf := range nsCollBtlConfigs {
collMap := map[string]*peer.StaticCollectionConfig{}
var collConf []*peer.CollectionConfig
for collName, btl := range nsCollBtlConf.btlConfig {
staticConf := &peer.StaticCollectionConfig{Name: collName, BlockToLive: btl}
collMap[collName] = staticConf
collectionConf := &peer.CollectionConfig{}
collectionConf.Payload = &peer.CollectionConfig_StaticCollectionConfig{StaticCollectionConfig: staticConf}
collConf = append(collConf, collectionConf)
}
collectionConfPkgs = append(collectionConfPkgs, &peer.CollectionConfigPackage{Config: collConf})
nsCollMap[nsCollBtlConf.namespace] = collMap
}
collectionConfPkg := &peer.CollectionConfigPackage{Config: collConf}

mockCCInfoProvider.ChaincodeInfoStub = func(channelName, ccName string, qe lgr.SimpleQueryExecutor) (*lgr.DeployedChaincodeInfo, error) {
if ccName == namespace {
return &lgr.DeployedChaincodeInfo{
Name: namespace, ExplicitCollectionConfigPkg: collectionConfPkg}, nil
for i, nsCollBtlConf := range nsCollBtlConfigs {
if ccName == nsCollBtlConf.namespace {
return &lgr.DeployedChaincodeInfo{
Name: nsCollBtlConf.namespace, ExplicitCollectionConfigPkg: collectionConfPkgs[i]}, nil
}
}
return nil, nil
}

mockCCInfoProvider.AllCollectionsConfigPkgStub = func(channelName, ccName string, qe lgr.SimpleQueryExecutor) (*peer.CollectionConfigPackage, error) {
if ccName == namespace {
return collectionConfPkg, nil
for i, nsCollBtlConf := range nsCollBtlConfigs {
if ccName == nsCollBtlConf.namespace {
return collectionConfPkgs[i], nil
}
}
return nil, nil

}

mockCCInfoProvider.CollectionInfoStub = func(channelName, ccName, collName string, qe lgr.SimpleQueryExecutor) (*peer.StaticCollectionConfig, error) {
if ccName == namespace {
return collMap[collName], nil
for _, nsCollBtlConf := range nsCollBtlConfigs {
if ccName == nsCollBtlConf.namespace {
return nsCollMap[nsCollBtlConf.namespace][collName], nil
}
}
return nil, nil
}
Expand Down
18 changes: 10 additions & 8 deletions core/ledger/kvledger/kv_ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,15 @@ func TestKVLedgerBlockStorageWithPvtdata(t *testing.T) {
func TestKVLedgerDBRecovery(t *testing.T) {
conf, cleanup := testConfig(t)
defer cleanup()
nsCollBtlConfs := []*nsCollBtlConfig{
{
namespace: "ns",
btlConfig: map[string]uint64{"coll": 0},
},
}
provider1 := testutilNewProviderWithCollectionConfig(
t,
"ns",
map[string]uint64{"coll": 0},
nsCollBtlConfs,
conf,
)
defer provider1.Close()
Expand Down Expand Up @@ -328,8 +333,7 @@ func TestKVLedgerDBRecovery(t *testing.T) {
// StateDB and HistoryDB should be recovered before returning from NewKVLedger call
provider2 := testutilNewProviderWithCollectionConfig(
t,
"ns",
map[string]uint64{"coll": 0},
nsCollBtlConfs,
conf,
)
defer provider2.Close()
Expand Down Expand Up @@ -385,8 +389,7 @@ func TestKVLedgerDBRecovery(t *testing.T) {
// history DB should be recovered before returning from NewKVLedger call
provider3 := testutilNewProviderWithCollectionConfig(
t,
"ns",
map[string]uint64{"coll": 0},
nsCollBtlConfs,
conf,
)
defer provider3.Close()
Expand Down Expand Up @@ -443,8 +446,7 @@ func TestKVLedgerDBRecovery(t *testing.T) {
// state DB should be recovered before returning from NewKVLedger call
provider4 := testutilNewProviderWithCollectionConfig(
t,
"ns",
map[string]uint64{"coll": 0},
nsCollBtlConfs,
conf,
)
defer provider4.Close()
Expand Down
9 changes: 7 additions & 2 deletions core/ledger/kvledger/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@ func TestGenerateSnapshot(t *testing.T) {
conf, cleanup := testConfig(t)
defer cleanup()
snapshotRootDir := conf.SnapshotsConfig.RootDir
nsCollBtlConfs := []*nsCollBtlConfig{
{
namespace: "ns",
btlConfig: map[string]uint64{"coll": 0},
},
}
provider := testutilNewProviderWithCollectionConfig(
t,
"ns",
map[string]uint64{"coll": 0},
nsCollBtlConfs,
conf,
)
defer provider.Close()
Expand Down
15 changes: 10 additions & 5 deletions core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/purge_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ func (p *PurgeMgr) UpdateExpiryInfoOfPvtDataOfOldBlocks(pvtUpdates *privacyenabl
builder := newExpiryScheduleBuilder(p.btlPolicy)
pvtUpdateCompositeKeyMap := pvtUpdates.ToCompositeKeyMap()
for k, vv := range pvtUpdateCompositeKeyMap {
builder.add(k.Namespace, k.CollectionName, k.Key, util.ComputeStringHash(k.Key), vv)
if err := builder.add(k.Namespace, k.CollectionName, k.Key, util.ComputeStringHash(k.Key), vv); err != nil {
return err
}
}

var expiryInfoUpdates []*expiryInfo
Expand Down Expand Up @@ -212,7 +214,10 @@ func (p *PurgeMgr) prepareWorkingsetFor(expiringAtBlk uint64) *workingset {
// Transform the keys into the form such that for each hashed key that is eligible for purge appears in 'toPurge'
toPurge := transformToExpiryInfoMap(expiryInfo)
// Load the latest versions of the hashed keys
p.preloadCommittedVersionsInCache(toPurge)
if err = p.preloadCommittedVersionsInCache(toPurge); err != nil {
workingset.err = err
return workingset
}
var expiryInfoKeysToClear []*expiryInfoKey

if len(toPurge) == 0 {
Expand Down Expand Up @@ -266,15 +271,15 @@ func (p *PurgeMgr) prepareWorkingsetFor(expiringAtBlk uint64) *workingset {
return workingset
}

func (p *PurgeMgr) preloadCommittedVersionsInCache(expInfoMap expiryInfoMap) {
func (p *PurgeMgr) preloadCommittedVersionsInCache(expInfoMap expiryInfoMap) error {
if !p.db.IsBulkOptimizable() {
return
return nil
}
var hashedKeys []*privacyenabledstate.HashedCompositeKey
for k := range expInfoMap {
hashedKeys = append(hashedKeys, &k)
}
p.db.LoadCommittedVersionsOfPubAndHashedKeys(nil, hashedKeys)
return p.db.LoadCommittedVersionsOfPubAndHashedKeys(nil, hashedKeys)
}

func transformToExpiryInfoMap(expiryInfo []*expiryInfo) expiryInfoMap {
Expand Down
4 changes: 3 additions & 1 deletion core/ledger/kvledger/txmgmt/queryutil/iterator_combiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func (combiner *itrCombiner) Next() (commonledger.QueryResult, error) {
}
}
kv := combiner.kvAt(smallestHolderIndex)
combiner.moveItrAndRemoveIfExhausted(smallestHolderIndex)
if _, err := combiner.moveItrAndRemoveIfExhausted(smallestHolderIndex); err != nil {
return nil, err
}
if kv.IsDelete() {
return combiner.Next()
}
Expand Down
7 changes: 4 additions & 3 deletions core/ledger/kvledger/txmgmt/rwsetutil/query_results_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ func (helper *RangeQueryResultsHelper) Done() ([]*kvrwset.KVRead, *kvrwset.Query
return helper.pendingResults, nil, err
}
}
helper.mt.done()
if err := helper.mt.done(); err != nil {
return nil, nil, err
}
return helper.pendingResults, helper.mt.getSummery(), nil
}

Expand All @@ -132,8 +134,7 @@ func (helper *RangeQueryResultsHelper) processPendingResults() error {
if err != nil {
return err
}
helper.mt.update(hash)
return nil
return helper.mt.update(hash)
}

func serializeKVReads(kvReads []*kvrwset.KVRead) ([]byte, error) {
Expand Down
4 changes: 3 additions & 1 deletion core/ledger/kvledger/txmgmt/txmgr/lockbased_txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ func NewLockBasedTxMgr(initializer *Initializer) (*LockBasedTxMgr, error) {
return nil, errors.New("create new lock based TxMgr failed: passed in nil ledger hasher")
}

initializer.DB.Open()
if err := initializer.DB.Open(); err != nil {
return nil, err
}
txmgr := &LockBasedTxMgr{
ledgerid: initializer.LedgerID,
db: initializer.DB,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ func (v *rangeQueryHashValidator) validate() (bool, error) {
return equals, nil
}
versionedKV := result.(*statedb.VersionedKV)
v.resultsHelper.AddResult(rwsetutil.NewKVRead(versionedKV.Key, versionedKV.Version))
if err := v.resultsHelper.AddResult(rwsetutil.NewKVRead(versionedKV.Key, versionedKV.Version)); err != nil {
return false, err
}
merkle := v.resultsHelper.GetMerkleSummary()

if merkle.MaxLevel < inMerkle.MaxLevel {
Expand Down
15 changes: 10 additions & 5 deletions core/ledger/kvledger/txmgmt/validation/tx_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (
func prepareTxOps(rwset *rwsetutil.TxRwSet, txht *version.Height,
precedingUpdates *publicAndHashUpdates, db *privacyenabledstate.DB) (txOps, error) {
txops := txOps{}
txops.applyTxRwset(rwset)
//logger.Debugf("prepareTxOps() txops after applying raw rwset=%#v", spew.Sdump(txops))
if err := txops.applyTxRwset(rwset); err != nil {
return nil, err
}
for ck, keyop := range txops {
// check if the final state of the key, value and metadata, is already present in the transaction, then skip
// otherwise we need to retrieve latest state and merge in the current value or metadata update
Expand Down Expand Up @@ -62,7 +63,9 @@ func (txops txOps) applyTxRwset(rwset *rwsetutil.TxRwSet) error {
txops.applyKVWrite(ns, "", kvWrite)
}
for _, kvMetadataWrite := range nsRWSet.KvRwSet.MetadataWrites {
txops.applyMetadata(ns, "", kvMetadataWrite)
if err := txops.applyMetadata(ns, "", kvMetadataWrite); err != nil {
return err
}
}

// apply collection level kvwrite and kvMetadataWrite
Expand All @@ -79,12 +82,14 @@ func (txops txOps) applyTxRwset(rwset *rwsetutil.TxRwSet) error {
}

for _, metadataWrite := range collHashRWset.HashedRwSet.MetadataWrites {
txops.applyMetadata(ns, coll,
if err := txops.applyMetadata(ns, coll,
&kvrwset.KVMetadataWrite{
Key: string(metadataWrite.KeyHash),
Entries: metadataWrite.Entries,
},
)
); err != nil {
return err
}
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions core/ledger/kvledger/txmgmt/validation/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ func (v *validator) validateAndPrepareBatch(blk *block, doMVCCValidation bool) (
if validationCode == peer.TxValidationCode_VALID {
logger.Debugf("Block [%d] Transaction index [%d] TxId [%s] marked as valid by state validator. ContainsPostOrderWrites [%t]", blk.num, tx.indexInBlock, tx.id, tx.containsPostOrderWrites)
committingTxHeight := version.NewHeight(blk.num, uint64(tx.indexInBlock))
updates.applyWriteSet(tx.rwset, committingTxHeight, v.db, tx.containsPostOrderWrites)
if err := updates.applyWriteSet(tx.rwset, committingTxHeight, v.db, tx.containsPostOrderWrites); err != nil {
return nil, err
}
} else {
logger.Warningf("Block [%d] Transaction index [%d] TxId [%s] marked as invalid by state validator. Reason code [%s]",
blk.num, tx.indexInBlock, tx.id, validationCode.String())
Expand Down Expand Up @@ -228,7 +230,9 @@ func (v *validator) validateRangeQuery(ns string, rangeQueryInfo *kvrwset.RangeQ
logger.Debug(`Hashing results are not present in the range query info hence, initiating raw KVReads based validation`)
qv = &rangeQueryResultsValidator{}
}
qv.init(rangeQueryInfo, combinedItr)
if err := qv.init(rangeQueryInfo, combinedItr); err != nil {
return false, err
}
return qv.validate()
}

Expand Down
8 changes: 6 additions & 2 deletions core/ledger/pvtdatastorage/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,16 @@ func prepareExpiryEntries(committingBlk uint64, dataEntries []*dataEntry, missin

// 1. prepare expiryData for non-missing data
for _, dataEntry := range dataEntries {
prepareExpiryEntriesForPresentData(mapByExpiringBlk, dataEntry.key, btlPolicy)
if err := prepareExpiryEntriesForPresentData(mapByExpiringBlk, dataEntry.key, btlPolicy); err != nil {
return nil, err
}
}

// 2. prepare expiryData for missing data
for missingDataKey := range missingDataEntries {
prepareExpiryEntriesForMissingData(mapByExpiringBlk, &missingDataKey, btlPolicy)
if err := prepareExpiryEntriesForMissingData(mapByExpiringBlk, &missingDataKey, btlPolicy); err != nil {
return nil, err
}
}

for expiryBlk, expiryData := range mapByExpiringBlk {
Expand Down
4 changes: 3 additions & 1 deletion core/ledger/pvtdatastorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,9 @@ func (s *Store) purgeExpiredData(minBlkNum, maxBlkNum uint64) error {
for _, missingDataKey := range missingDataKeys {
batch.Delete(encodeMissingDataKey(missingDataKey))
}
s.db.WriteBatch(batch, false)
if err := s.db.WriteBatch(batch, false); err != nil {
return err
}
batch.Reset()
}
logger.Infof("[%s] - [%d] Entries purged from private data storage till block number [%d]", s.ledgerid, len(expiryEntries), maxBlkNum)
Expand Down

0 comments on commit 09764d8

Please sign in to comment.