Skip to content

Commit

Permalink
refactor pvtdatastore
Browse files Browse the repository at this point in the history
Signed-off-by: senthil <cendhu@gmail.com>
  • Loading branch information
cendhu authored and denyeart committed Sep 16, 2020
1 parent 659fe39 commit 7eaead1
Showing 1 changed file with 97 additions and 132 deletions.
229 changes: 97 additions & 132 deletions core/ledger/pvtdatastorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,8 @@ type storeEntries struct {
type lastUpdatedOldBlocksList []uint64

type entriesForPvtDataOfOldBlocks struct {
// for each <ns, coll, blkNum, txNum>, store the dataEntry, i.e., pvtData
dataEntries map[dataKey]*rwset.CollectionPvtReadWriteSet
// store the retrieved (& updated) expiryData in expiryEntries
expiryEntries map[expiryKey]*ExpiryData
// for each <ns, coll, blkNum>, store the retrieved (& updated) bitmap in the missingDataEntries
dataEntries map[dataKey]*rwset.CollectionPvtReadWriteSet
expiryEntries map[expiryKey]*ExpiryData
missingDataEntries map[nsCollBlk]*bitset.BitSet
}

Expand Down Expand Up @@ -263,93 +260,81 @@ func (s *Store) Commit(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvtD

// CommitPvtDataOfOldBlocks commits the pvtData (i.e., previously missing data) of old blocks.
// The parameter `blocksPvtData` refers a list of old block's pvtdata which are missing in the pvtstore.
// Given a list of old block's pvtData, `CommitPvtDataOfOldBlocks` performs the following four
// Given a list of old block's pvtData, `CommitPvtDataOfOldBlocks` performs the following three
// operations
// (1) construct dataEntries for all pvtData
// (2) construct update entries (i.e., dataEntries, expiryEntries, missingDataEntries)
// (1) construct update entries (i.e., dataEntries, expiryEntries, missingDataEntries)
// from the above created data entries
// (3) create a db update batch from the update entries
// (4) commit the update batch to the pvtStore
// (2) create a db update batch from the update entries
// (3) commit the update batch to the pvtStore
func (s *Store) CommitPvtDataOfOldBlocks(blocksPvtData map[uint64][]*ledger.TxPvtData) error {
if s.isLastUpdatedOldBlocksSet {
return &ErrIllegalCall{`The lastUpdatedOldBlocksList is set. It means that the
stateDB may not be in sync with the pvtStore`}
}

logger.Debugf("Constructing pvtdatastore entries for pvtData of [%d] old blocks", len(blocksPvtData))
updateEntries, err := s.constructUpdateEntries(blocksPvtData)
entries, err := s.constructEntries(blocksPvtData)
if err != nil {
return err
}

logger.Debug("Constructing update batch from pvtdatastore entries")
batch, err := s.constructUpdateBatch(updateEntries)
if err != nil {
batch := s.db.NewUpdateBatch()
if err := entries.addToUpdateBatch(batch); err != nil {
return err
}

logger.Debug("Committing the update batch to pvtdatastore")
if err := s.commitBatch(batch); err != nil {
return err
}

return nil
return s.db.WriteBatch(batch, true)
}

func (s *Store) constructUpdateEntries(blocksPvtData map[uint64][]*ledger.TxPvtData) (*entriesForPvtDataOfOldBlocks, error) {
func (s *Store) constructEntries(blocksPvtData map[uint64][]*ledger.TxPvtData) (*entriesForPvtDataOfOldBlocks, error) {
var dataEntries []*dataEntry
for blkNum, pvtData := range blocksPvtData {
dataEntries = append(dataEntries, prepareDataEntries(blkNum, pvtData)...)
}

updateEntries := &entriesForPvtDataOfOldBlocks{
entries := &entriesForPvtDataOfOldBlocks{
dataEntries: make(map[dataKey]*rwset.CollectionPvtReadWriteSet),
expiryEntries: make(map[expiryKey]*ExpiryData),
missingDataEntries: make(map[nsCollBlk]*bitset.BitSet)}
missingDataEntries: make(map[nsCollBlk]*bitset.BitSet),
}

// for each data entry, first, get the expiryData and missingData from the pvtStore.
// Second, update the expiryData and missingData as per the data entry. Finally, add
// the data entry along with the updated expiryData and missingData to the update entries
for _, dataEntry := range dataEntries {
// get the expiryBlk number to construct the expiryKey
expiryKey, err := s.constructExpiryKeyFromDataEntry(dataEntry)
var expData *ExpiryData
nsCollBlk := dataEntry.key.nsCollBlk
txNum := dataEntry.key.txNum

expKey, err := s.constructExpiryKeyFromDataEntry(dataEntry)
if err != nil {
return nil, err
}

// get the existing expiryData entry
var expiryData *ExpiryData
if !neverExpires(expiryKey.expiringBlk) {
if expiryData, err = s.getExpiryDataFromUpdateEntriesOrStore(updateEntries, expiryKey); err != nil {
if !neverExpires(expKey.expiringBlk) {
if expData, err = s.getExpiryDataFromEntriesOrStore(entries, expKey); err != nil {
return nil, err
}
if expiryData == nil {
if expData == nil {
// data entry is already expired
// and purged (a rare scenario)
continue
}
expData.addPresentData(nsCollBlk.ns, nsCollBlk.coll, txNum)
}

// get the existing missingData entry
var missingData *bitset.BitSet
nsCollBlk := dataEntry.key.nsCollBlk
if missingData, err = s.getMissingDataFromUpdateEntriesOrStore(updateEntries, nsCollBlk); err != nil {
if missingData, err = s.getMissingDataFromEntriesOrStore(entries, nsCollBlk); err != nil {
return nil, err
}
if missingData == nil {
// data entry is already expired
// and purged (a rare scenario)
continue
}
missingData.Clear(uint(txNum))

updateEntries.addDataEntry(dataEntry)
if expiryData != nil { // would be nil for the never expiring entry
expiryEntry := &expiryEntry{&expiryKey, expiryData}
updateEntries.updateAndAddExpiryEntry(expiryEntry, dataEntry.key)
}
updateEntries.updateAndAddMissingDataEntry(missingData, dataEntry.key)
entries.add(dataEntry, expKey, expData, missingData)
}
return updateEntries, nil
return entries, nil
}

func (s *Store) constructExpiryKeyFromDataEntry(dataEntry *dataEntry) (expiryKey, error) {
Expand All @@ -359,140 +344,120 @@ func (s *Store) constructExpiryKeyFromDataEntry(dataEntry *dataEntry) (expiryKey
if err != nil {
return expiryKey{}, err
}
return expiryKey{expiringBlk, nsCollBlk.blkNum}, nil

return expiryKey{
expiringBlk: expiringBlk,
committingBlk: nsCollBlk.blkNum,
}, nil
}

func (s *Store) getExpiryDataFromUpdateEntriesOrStore(updateEntries *entriesForPvtDataOfOldBlocks, expiryKey expiryKey) (*ExpiryData, error) {
expiryData, ok := updateEntries.expiryEntries[expiryKey]
if !ok {
var err error
expiryData, err = s.getExpiryDataOfExpiryKey(&expiryKey)
if err != nil {
return nil, err
}
func (s *Store) getExpiryDataFromEntriesOrStore(updateEntries *entriesForPvtDataOfOldBlocks, expiryKey expiryKey) (*ExpiryData, error) {
if expiryData, ok := updateEntries.expiryEntries[expiryKey]; ok {
return expiryData, nil
}

expiryData, err := s.getExpiryDataOfExpiryKey(&expiryKey)
if err != nil {
return nil, err
}
return expiryData, nil
}

func (s *Store) getMissingDataFromUpdateEntriesOrStore(updateEntries *entriesForPvtDataOfOldBlocks, nsCollBlk nsCollBlk) (*bitset.BitSet, error) {
missingData, ok := updateEntries.missingDataEntries[nsCollBlk]
if !ok {
var err error
missingDataKey := &missingDataKey{nsCollBlk, true}
missingData, err = s.getBitmapOfMissingDataKey(missingDataKey)
if err != nil {
return nil, err
}
func (s *Store) getMissingDataFromEntriesOrStore(updateEntries *entriesForPvtDataOfOldBlocks, nsCollBlk nsCollBlk) (*bitset.BitSet, error) {
if missingData, ok := updateEntries.missingDataEntries[nsCollBlk]; ok {
return missingData, nil
}
return missingData, nil
}

func (updateEntries *entriesForPvtDataOfOldBlocks) addDataEntry(dataEntry *dataEntry) {
dataKey := dataKey{dataEntry.key.nsCollBlk, dataEntry.key.txNum}
updateEntries.dataEntries[dataKey] = dataEntry.value
missingDataKey := &missingDataKey{
nsCollBlk: nsCollBlk,
isEligible: true,
}
missingData, err := s.getBitmapOfMissingDataKey(missingDataKey)
if err != nil {
return nil, err
}
return missingData, nil
}

func (updateEntries *entriesForPvtDataOfOldBlocks) updateAndAddExpiryEntry(expiryEntry *expiryEntry, dataKey *dataKey) {
txNum := dataKey.txNum
nsCollBlk := dataKey.nsCollBlk
// update
expiryEntry.value.addPresentData(nsCollBlk.ns, nsCollBlk.coll, txNum)
// we cannot delete entries from MissingDataMap as
// we keep only one entry per missing <ns-col>
// irrespective of the number of txNum.
func (e *entriesForPvtDataOfOldBlocks) add(datEntry *dataEntry, expKey expiryKey, expData *ExpiryData, missingData *bitset.BitSet) {
dataKey := dataKey{
nsCollBlk: datEntry.key.nsCollBlk,
txNum: datEntry.key.txNum,
}
e.dataEntries[dataKey] = datEntry.value

// add
expiryKey := expiryKey{expiryEntry.key.expiringBlk, expiryEntry.key.committingBlk}
updateEntries.expiryEntries[expiryKey] = expiryEntry.value
}
if expData != nil {
e.expiryEntries[expKey] = expData
}

func (updateEntries *entriesForPvtDataOfOldBlocks) updateAndAddMissingDataEntry(missingData *bitset.BitSet, dataKey *dataKey) {
txNum := dataKey.txNum
nsCollBlk := dataKey.nsCollBlk
// update
missingData.Clear(uint(txNum))
// add
updateEntries.missingDataEntries[nsCollBlk] = missingData
e.missingDataEntries[dataKey.nsCollBlk] = missingData
}

func (s *Store) constructUpdateBatch(updateEntries *entriesForPvtDataOfOldBlocks) (*leveldbhelper.UpdateBatch, error) {
batch := s.db.NewUpdateBatch()

// add the following four types of entries to the update batch: (1) new data entries
// (i.e., pvtData), (2) updated expiry entries, (3) updated missing data entries, and
// (4) updated block list

// (1) add new data entries to the batch
if err := addNewDataEntriesToUpdateBatch(batch, updateEntries); err != nil {
return nil, err
}

// (2) add updated expiryEntry to the batch
if err := addUpdatedExpiryEntriesToUpdateBatch(batch, updateEntries); err != nil {
return nil, err
func (e *entriesForPvtDataOfOldBlocks) addToUpdateBatch(batch *leveldbhelper.UpdateBatch) error {
if err := e.addDataEntriesToUpdateBatch(batch); err != nil {
return err
}

// (3) add updated missingData to the batch
if err := addUpdatedMissingDataEntriesToUpdateBatch(batch, updateEntries); err != nil {
return nil, err
if err := e.addExpiryEntriesToUpdateBatch(batch); err != nil {
return err
}

return batch, nil
return e.addMissingDataEntriesToUpdateBatch(batch)
}

func addNewDataEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch, entries *entriesForPvtDataOfOldBlocks) error {
var keyBytes, valBytes []byte
func (e *entriesForPvtDataOfOldBlocks) addDataEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch) error {
var key, val []byte
var err error
for dataKey, pvtData := range entries.dataEntries {
keyBytes = encodeDataKey(&dataKey)
if valBytes, err = encodeDataValue(pvtData); err != nil {

for dataKey, pvtData := range e.dataEntries {
key = encodeDataKey(&dataKey)
if val, err = encodeDataValue(pvtData); err != nil {
return err
}
batch.Put(keyBytes, valBytes)
batch.Put(key, val)
}
return nil
}

func addUpdatedExpiryEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch, entries *entriesForPvtDataOfOldBlocks) error {
var keyBytes, valBytes []byte
func (e *entriesForPvtDataOfOldBlocks) addExpiryEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch) error {
var key, val []byte
var err error
for expiryKey, expiryData := range entries.expiryEntries {
keyBytes = encodeExpiryKey(&expiryKey)
if valBytes, err = encodeExpiryValue(expiryData); err != nil {

for expiryKey, expiryData := range e.expiryEntries {
key = encodeExpiryKey(&expiryKey)
if val, err = encodeExpiryValue(expiryData); err != nil {
return err
}
batch.Put(keyBytes, valBytes)
batch.Put(key, val)
}
return nil
}

func addUpdatedMissingDataEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch, entries *entriesForPvtDataOfOldBlocks) error {
var keyBytes, valBytes []byte
func (e *entriesForPvtDataOfOldBlocks) addMissingDataEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch) error {
var key, val []byte
var err error
for nsCollBlk, missingData := range entries.missingDataEntries {
keyBytes = encodeMissingDataKey(&missingDataKey{nsCollBlk, true})
// if the missingData is empty, we need to delete the missingDataKey

for nsCollBlk, missingData := range e.missingDataEntries {
key = encodeMissingDataKey(
&missingDataKey{
nsCollBlk: nsCollBlk,
isEligible: true,
},
)

if missingData.None() {
batch.Delete(keyBytes)
batch.Delete(key)
continue
}
if valBytes, err = encodeMissingDataValue(missingData); err != nil {

if val, err = encodeMissingDataValue(missingData); err != nil {
return err
}
batch.Put(keyBytes, valBytes)
batch.Put(key, val)
}
return nil
}

func (s *Store) commitBatch(batch *leveldbhelper.UpdateBatch) error {
// commit the batch to the store
if err := s.db.WriteBatch(batch, true); err != nil {
return err
}

return nil
}

// GetLastUpdatedOldBlocksPvtData returns the pvtdata of blocks listed in `lastUpdatedOldBlocksList`
// TODO FAB-16293 -- GetLastUpdatedOldBlocksPvtData() can be removed either in v2.0 or in v2.1.
// If we decide to rebuild stateDB in v2.0, by default, the rebuild logic would take
Expand Down

0 comments on commit 7eaead1

Please sign in to comment.