Skip to content

Commit

Permalink
deprioritize unreconcilable missingPvtData (hyperledger#1721)
Browse files Browse the repository at this point in the history
* deprioritize unreconcilable missingPvtData

Signed-off-by: senthil <cendhu@gmail.com>

* address review comment

Signed-off-by: senthil <cendhu@gmail.com>

* add review comments

Signed-off-by: senthil <cendhu@gmail.com>

* address review comments

Signed-off-by: senthil <cendhu@gmail.com>
  • Loading branch information
cendhu committed Sep 8, 2020
1 parent bb5aa49 commit f108f93
Show file tree
Hide file tree
Showing 9 changed files with 1,182 additions and 424 deletions.
2 changes: 1 addition & 1 deletion core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ func (l *kvLedger) CommitPvtDataOfOldBlocks(reconciledPvtdata []*ledger.Reconcil

logger.Debugf("[%s:] Committing pvtData of [%d] old blocks to the pvtdatastore", l.ledgerID, len(reconciledPvtdata))

err = l.pvtdataStore.CommitPvtDataOfOldBlocks(hashVerifiedPvtData)
err = l.pvtdataStore.CommitPvtDataOfOldBlocks(hashVerifiedPvtData, nil)
if err != nil {
return nil, err
}
Expand Down
90 changes: 62 additions & 28 deletions core/ledger/pvtdatastorage/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@ func prepareStoreEntries(blockNum uint64, pvtData []*ledger.TxPvtData, btlPolicy
missingPvtData ledger.TxMissingPvtDataMap) (*storeEntries, error) {
dataEntries := prepareDataEntries(blockNum, pvtData)

missingDataEntries := prepareMissingDataEntries(blockNum, missingPvtData)
elgMissingDataEntries, inelgMissingDataEntries := prepareMissingDataEntries(blockNum, missingPvtData)

expiryEntries, err := prepareExpiryEntries(blockNum, dataEntries, missingDataEntries, btlPolicy)
expiryEntries, err := prepareExpiryEntries(blockNum, dataEntries, elgMissingDataEntries, inelgMissingDataEntries, btlPolicy)
if err != nil {
return nil, err
}

return &storeEntries{
dataEntries: dataEntries,
expiryEntries: expiryEntries,
missingDataEntries: missingDataEntries}, nil
dataEntries: dataEntries,
expiryEntries: expiryEntries,
elgMissingDataEntries: elgMissingDataEntries,
inelgMissingDataEntries: inelgMissingDataEntries,
}, nil
}

func prepareDataEntries(blockNum uint64, pvtData []*ledger.TxPvtData) []*dataEntry {
Expand All @@ -48,42 +50,61 @@ func prepareDataEntries(blockNum uint64, pvtData []*ledger.TxPvtData) []*dataEnt
return dataEntries
}

func prepareMissingDataEntries(committingBlk uint64, missingPvtData ledger.TxMissingPvtDataMap) map[missingDataKey]*bitset.BitSet {
missingDataEntries := make(map[missingDataKey]*bitset.BitSet)
func prepareMissingDataEntries(
committingBlk uint64,
missingPvtData ledger.TxMissingPvtDataMap,
) (map[missingDataKey]*bitset.BitSet, map[missingDataKey]*bitset.BitSet) {
elgMissingDataEntries := make(map[missingDataKey]*bitset.BitSet)
inelgMissingDataEntries := make(map[missingDataKey]*bitset.BitSet)

for txNum, missingData := range missingPvtData {
for _, nsColl := range missingData {
key := missingDataKey{nsCollBlk{nsColl.Namespace, nsColl.Collection, committingBlk},
nsColl.IsEligible}

if _, ok := missingDataEntries[key]; !ok {
missingDataEntries[key] = &bitset.BitSet{}
key := missingDataKey{
nsCollBlk{
ns: nsColl.Namespace,
coll: nsColl.Collection,
blkNum: committingBlk,
},
}
bitmap := missingDataEntries[key]

bitmap.Set(uint(txNum))
switch nsColl.IsEligible {
case true:
if _, ok := elgMissingDataEntries[key]; !ok {
elgMissingDataEntries[key] = &bitset.BitSet{}
}
elgMissingDataEntries[key].Set(uint(txNum))
default:
if _, ok := inelgMissingDataEntries[key]; !ok {
inelgMissingDataEntries[key] = &bitset.BitSet{}
}
inelgMissingDataEntries[key].Set(uint(txNum))
}
}
}

return missingDataEntries
return elgMissingDataEntries, inelgMissingDataEntries
}

// prepareExpiryEntries returns expiry entries for both private data which is present in the committingBlk
// and missing private.
func prepareExpiryEntries(committingBlk uint64, dataEntries []*dataEntry, missingDataEntries map[missingDataKey]*bitset.BitSet,
func prepareExpiryEntries(committingBlk uint64, dataEntries []*dataEntry, elgMissingDataEntries, inelgMissingDataEntries map[missingDataKey]*bitset.BitSet,
btlPolicy pvtdatapolicy.BTLPolicy) ([]*expiryEntry, error) {
var expiryEntries []*expiryEntry
mapByExpiringBlk := make(map[uint64]*ExpiryData)

// 1. prepare expiryData for non-missing data
for _, dataEntry := range dataEntries {
if err := prepareExpiryEntriesForPresentData(mapByExpiringBlk, dataEntry.key, btlPolicy); err != nil {
return nil, err
}
}

// 2. prepare expiryData for missing data
for missingDataKey := range missingDataEntries {
for missingDataKey := range elgMissingDataEntries {
if err := prepareExpiryEntriesForMissingData(mapByExpiringBlk, &missingDataKey, btlPolicy); err != nil {
return nil, err
}
}

for missingDataKey := range inelgMissingDataEntries {
if err := prepareExpiryEntriesForMissingData(mapByExpiringBlk, &missingDataKey, btlPolicy); err != nil {
return nil, err
}
Expand Down Expand Up @@ -139,25 +160,38 @@ func getOrCreateExpiryData(mapByExpiringBlk map[uint64]*ExpiryData, expiringBlk
}

// deriveKeys constructs dataKeys and missingDataKey from an expiryEntry
func deriveKeys(expiryEntry *expiryEntry) (dataKeys []*dataKey, missingDataKeys []*missingDataKey) {
func deriveKeys(expiryEntry *expiryEntry) ([]*dataKey, []*missingDataKey) {
var dataKeys []*dataKey
var missingDataKeys []*missingDataKey

for ns, colls := range expiryEntry.value.Map {
// 1. constructs dataKeys of expired existing pvt data
for coll, txNums := range colls.Map {
for _, txNum := range txNums.List {
dataKeys = append(dataKeys,
&dataKey{nsCollBlk{ns, coll, expiryEntry.key.committingBlk}, txNum})
&dataKey{
nsCollBlk: nsCollBlk{
ns: ns,
coll: coll,
blkNum: expiryEntry.key.committingBlk,
},
txNum: txNum,
})
}
}
// 2. constructs missingDataKeys of expired missing pvt data

for coll := range colls.MissingDataMap {
// one key for eligible entries and another for ieligible entries
missingDataKeys = append(missingDataKeys,
&missingDataKey{nsCollBlk{ns, coll, expiryEntry.key.committingBlk}, true})
missingDataKeys = append(missingDataKeys,
&missingDataKey{nsCollBlk{ns, coll, expiryEntry.key.committingBlk}, false})
&missingDataKey{
nsCollBlk: nsCollBlk{
ns: ns,
coll: coll,
blkNum: expiryEntry.key.committingBlk,
},
})
}
}
return

return dataKeys, missingDataKeys
}

func passesFilter(dataKey *dataKey, filter ledger.PvtNsCollFilter) bool {
Expand Down
147 changes: 80 additions & 67 deletions core/ledger/pvtdatastorage/kv_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,30 @@ import (
)

var (
pendingCommitKey = []byte{0}
lastCommittedBlkkey = []byte{1}
pvtDataKeyPrefix = []byte{2}
expiryKeyPrefix = []byte{3}
eligibleMissingDataKeyPrefix = []byte{4}
ineligibleMissingDataKeyPrefix = []byte{5}
collElgKeyPrefix = []byte{6}
lastUpdatedOldBlocksKey = []byte{7}
pendingCommitKey = []byte{0}
lastCommittedBlkkey = []byte{1}
pvtDataKeyPrefix = []byte{2}
expiryKeyPrefix = []byte{3}
elgPrioritizedMissingDataGroup = []byte{4}
inelgMissingDataGroup = []byte{5}
collElgKeyPrefix = []byte{6}
lastUpdatedOldBlocksKey = []byte{7}
elgDeprioritizedMissingDataGroup = []byte{8}

nilByte = byte(0)
emptyValue = []byte{}
)

func getDataKeysForRangeScanByBlockNum(blockNum uint64) (startKey, endKey []byte) {
startKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...)
endKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum+1, 0).ToBytes()...)
return
func getDataKeysForRangeScanByBlockNum(blockNum uint64) ([]byte, []byte) {
startKey := append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...)
endKey := append(pvtDataKeyPrefix, version.NewHeight(blockNum+1, 0).ToBytes()...)
return startKey, endKey
}

func getExpiryKeysForRangeScan(minBlkNum, maxBlkNum uint64) (startKey, endKey []byte) {
startKey = append(expiryKeyPrefix, version.NewHeight(minBlkNum, 0).ToBytes()...)
endKey = append(expiryKeyPrefix, version.NewHeight(maxBlkNum+1, 0).ToBytes()...)
return
func getExpiryKeysForRangeScan(minBlkNum, maxBlkNum uint64) ([]byte, []byte) {
startKey := append(expiryKeyPrefix, version.NewHeight(minBlkNum, 0).ToBytes()...)
endKey := append(expiryKeyPrefix, version.NewHeight(maxBlkNum+1, 0).ToBytes()...)
return startKey, endKey
}

func encodeLastCommittedBlockVal(blockNum uint64) []byte {
Expand Down Expand Up @@ -107,47 +108,52 @@ func decodeDataValue(datavalueBytes []byte) (*rwset.CollectionPvtReadWriteSet, e
return collPvtdata, err
}

func encodeMissingDataKey(key *missingDataKey) []byte {
if key.isEligible {
// When missing pvtData reconciler asks for missing data info,
// it is necessary to pass the missing pvtdata info associated with
// the most recent block so that missing pvtdata in the state db can
// be fixed sooner to reduce the "private data matching public hash version
// is not available" error during endorserments. In order to give priority
// to missing pvtData in the most recent block, we use reverse order
// preserving encoding for the missing data key. This simplifies the
// implementation of GetMissingPvtDataInfoForMostRecentBlocks().
keyBytes := append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(key.blkNum)...)
keyBytes = append(keyBytes, []byte(key.ns)...)
keyBytes = append(keyBytes, nilByte)
return append(keyBytes, []byte(key.coll)...)
}
func encodeElgPrioMissingDataKey(key *missingDataKey) []byte {
// When missing pvtData reconciler asks for missing data info,
// it is necessary to pass the missing pvtdata info associated with
// the most recent block so that missing pvtdata in the state db can
// be fixed sooner to reduce the "private data matching public hash version
// is not available" error during endorserments. In order to give priority
// to missing pvtData in the most recent block, we use reverse order
// preserving encoding for the missing data key. This simplifies the
// implementation of GetMissingPvtDataInfoForMostRecentBlocks().
encKey := append(elgPrioritizedMissingDataGroup, encodeReverseOrderVarUint64(key.blkNum)...)
encKey = append(encKey, []byte(key.ns)...)
encKey = append(encKey, nilByte)
return append(encKey, []byte(key.coll)...)
}

keyBytes := append(ineligibleMissingDataKeyPrefix, []byte(key.ns)...)
keyBytes = append(keyBytes, nilByte)
keyBytes = append(keyBytes, []byte(key.coll)...)
keyBytes = append(keyBytes, nilByte)
return append(keyBytes, []byte(encodeReverseOrderVarUint64(key.blkNum))...)
func encodeElgDeprioMissingDataKey(key *missingDataKey) []byte {
encKey := append(elgDeprioritizedMissingDataGroup, encodeReverseOrderVarUint64(key.blkNum)...)
encKey = append(encKey, []byte(key.ns)...)
encKey = append(encKey, nilByte)
return append(encKey, []byte(key.coll)...)
}

func decodeMissingDataKey(keyBytes []byte) *missingDataKey {
func decodeElgMissingDataKey(keyBytes []byte) *missingDataKey {
key := &missingDataKey{nsCollBlk: nsCollBlk{}}
if keyBytes[0] == eligibleMissingDataKeyPrefix[0] {
blkNum, numBytesConsumed := decodeReverseOrderVarUint64(keyBytes[1:])

splittedKey := bytes.Split(keyBytes[numBytesConsumed+1:], []byte{nilByte})
key.ns = string(splittedKey[0])
key.coll = string(splittedKey[1])
key.blkNum = blkNum
key.isEligible = true
return key
}
blkNum, numBytesConsumed := decodeReverseOrderVarUint64(keyBytes[1:])
splittedKey := bytes.Split(keyBytes[numBytesConsumed+1:], []byte{nilByte})
key.ns = string(splittedKey[0])
key.coll = string(splittedKey[1])
key.blkNum = blkNum
return key
}

func encodeInelgMissingDataKey(key *missingDataKey) []byte {
encKey := append(inelgMissingDataGroup, []byte(key.ns)...)
encKey = append(encKey, nilByte)
encKey = append(encKey, []byte(key.coll)...)
encKey = append(encKey, nilByte)
return append(encKey, []byte(encodeReverseOrderVarUint64(key.blkNum))...)
}

func decodeInelgMissingDataKey(keyBytes []byte) *missingDataKey {
key := &missingDataKey{nsCollBlk: nsCollBlk{}}
splittedKey := bytes.SplitN(keyBytes[1:], []byte{nilByte}, 3) //encoded bytes for blknum may contain empty bytes
key.ns = string(splittedKey[0])
key.coll = string(splittedKey[1])
key.blkNum, _ = decodeReverseOrderVarUint64(splittedKey[2])
key.isEligible = false
return key
}

Expand Down Expand Up @@ -184,44 +190,51 @@ func decodeCollElgVal(b []byte) (*CollElgInfo, error) {
return m, nil
}

func createRangeScanKeysForEligibleMissingDataEntries(blkNum uint64) (startKey, endKey []byte) {
startKey = append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(blkNum)...)
endKey = append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(0)...)
func createRangeScanKeysForElgMissingData(blkNum uint64, group []byte) ([]byte, []byte) {
startKey := append(group, encodeReverseOrderVarUint64(blkNum)...)
endKey := append(group, encodeReverseOrderVarUint64(0)...)

return startKey, endKey
}

func createRangeScanKeysForIneligibleMissingData(maxBlkNum uint64, ns, coll string) (startKey, endKey []byte) {
startKey = encodeMissingDataKey(
func createRangeScanKeysForInelgMissingData(maxBlkNum uint64, ns, coll string) ([]byte, []byte) {
startKey := encodeInelgMissingDataKey(
&missingDataKey{
nsCollBlk: nsCollBlk{ns: ns, coll: coll, blkNum: maxBlkNum},
isEligible: false,
nsCollBlk: nsCollBlk{
ns: ns,
coll: coll,
blkNum: maxBlkNum,
},
},
)
endKey = encodeMissingDataKey(
endKey := encodeInelgMissingDataKey(
&missingDataKey{
nsCollBlk: nsCollBlk{ns: ns, coll: coll, blkNum: 0},
isEligible: false,
nsCollBlk: nsCollBlk{
ns: ns,
coll: coll,
blkNum: 0,
},
},
)
return

return startKey, endKey
}

func createRangeScanKeysForCollElg() (startKey, endKey []byte) {
return encodeCollElgKey(math.MaxUint64),
encodeCollElgKey(0)
}

func datakeyRange(blockNum uint64) (startKey, endKey []byte) {
startKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...)
endKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum, math.MaxUint64).ToBytes()...)
return
func datakeyRange(blockNum uint64) ([]byte, []byte) {
startKey := append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...)
endKey := append(pvtDataKeyPrefix, version.NewHeight(blockNum, math.MaxUint64).ToBytes()...)
return startKey, endKey
}

func eligibleMissingdatakeyRange(blkNum uint64) (startKey, endKey []byte) {
startKey = append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(blkNum)...)
endKey = append(eligibleMissingDataKeyPrefix, encodeReverseOrderVarUint64(blkNum-1)...)
return
func eligibleMissingdatakeyRange(blkNum uint64) ([]byte, []byte) {
startKey := append(elgPrioritizedMissingDataGroup, encodeReverseOrderVarUint64(blkNum)...)
endKey := append(elgPrioritizedMissingDataGroup, encodeReverseOrderVarUint64(blkNum-1)...)
return startKey, endKey
}

// encodeReverseOrderVarUint64 returns a byte-representation for a uint64 number such that
Expand Down
Loading

0 comments on commit f108f93

Please sign in to comment.