Skip to content

Commit

Permalink
Use directly leveldb batch (#1507)
Browse files Browse the repository at this point in the history
This commit improves the leveldbhelper.UpdateBatch by
replacing the map that it maintains by the actual leveldb batch.

This reduces the memory consumed and a sorting of key-values is avoided
if the data added is already in sorted order (such as loading the data
from snapshot files)

Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi authored and ale-linux committed Sep 8, 2020
1 parent 99332da commit bebe131
Show file tree
Hide file tree
Showing 16 changed files with 66 additions and 71 deletions.
6 changes: 3 additions & 3 deletions common/ledger/blkstorage/blockindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
blkNum := blockIdxInfo.blockNum
blkHash := blockIdxInfo.blockHash
txsfltr := txflags.ValidationFlags(blockIdxInfo.metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
batch := leveldbhelper.NewUpdateBatch()
batch := index.db.NewUpdateBatch()
flpBytes, err := flp.marshal()
if err != nil {
return err
Expand Down Expand Up @@ -343,7 +343,7 @@ func importTxIDsFromSnapshot(
lastBlockNumInSnapshot uint64,
db *leveldbhelper.DBHandle) error {

batch := leveldbhelper.NewUpdateBatch()
batch := db.NewUpdateBatch()
txIDsMetadata, err := snapshot.OpenFile(filepath.Join(snapshotDir, snapshotMetadataFileName), snapshotFileFormat)
if err != nil {
return err
Expand All @@ -369,7 +369,7 @@ func importTxIDsFromSnapshot(
if err := db.WriteBatch(batch, true); err != nil {
return err
}
batch = leveldbhelper.NewUpdateBatch()
batch = db.NewUpdateBatch()
}
}
batch.Put(indexSavePointKey, encodeBlockNum(lastBlockNumInSnapshot))
Expand Down
2 changes: 1 addition & 1 deletion common/ledger/blkstorage/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (r *rollbackMgr) deleteIndexEntriesRange(startBlkNum, endBlkNum uint64) err
// entries. However, if there is more than more than 1 channel, dropping of
// index would impact the time taken to recover the peer. We need to analyze
// a bit before making a decision on rollback vs drop of index. FAB-15672
batch := leveldbhelper.NewUpdateBatch()
batch := r.indexStore.db.NewUpdateBatch()
lp, err := r.indexStore.getBlockLocByBlockNum(startBlkNum)
if err != nil {
return err
Expand Down
42 changes: 16 additions & 26 deletions common/ledger/util/leveldbhelper/leveldb_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,21 +204,20 @@ func (h *DBHandle) DeleteAll() error {
return nil
}

// NewUpdateBatch returns a new UpdateBatch that can be used to update the db
func (h *DBHandle) NewUpdateBatch() *UpdateBatch {
return &UpdateBatch{
dbName: h.dbName,
Batch: &leveldb.Batch{},
}
}

// WriteBatch writes a batch in an atomic way
func (h *DBHandle) WriteBatch(batch *UpdateBatch, sync bool) error {
if len(batch.KVs) == 0 {
if batch == nil || batch.Len() == 0 {
return nil
}
levelBatch := &leveldb.Batch{}
for k, v := range batch.KVs {
key := constructLevelKey(h.dbName, []byte(k))
if v == nil {
levelBatch.Delete(key)
} else {
levelBatch.Put(key, v)
}
}
if err := h.db.WriteBatch(levelBatch, sync); err != nil {
if err := h.db.WriteBatch(batch.Batch, sync); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -252,30 +251,21 @@ func (h *DBHandle) Close() {

// UpdateBatch encloses the details of multiple `updates`
type UpdateBatch struct {
KVs map[string][]byte
}

// NewUpdateBatch constructs an instance of a Batch
func NewUpdateBatch() *UpdateBatch {
return &UpdateBatch{make(map[string][]byte)}
*leveldb.Batch
dbName string
}

// Put adds a KV
func (batch *UpdateBatch) Put(key []byte, value []byte) {
func (b *UpdateBatch) Put(key []byte, value []byte) {
if value == nil {
panic("Nil value not allowed")
}
batch.KVs[string(key)] = value
b.Batch.Put(constructLevelKey(b.dbName, key), value)
}

// Delete deletes a Key and associated value
func (batch *UpdateBatch) Delete(key []byte) {
batch.KVs[string(key)] = nil
}

// Len returns the number of entries in the batch
func (batch *UpdateBatch) Len() int {
return len(batch.KVs)
func (b *UpdateBatch) Delete(key []byte) {
b.Batch.Delete(constructLevelKey(b.dbName, key))
}

// Iterator extends actual leveldb iterator
Expand Down
4 changes: 2 additions & 2 deletions common/ledger/util/leveldbhelper/leveldb_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,15 @@ func TestBatchedUpdates(t *testing.T) {

dbs := []*DBHandle{db1, db2}
for _, db := range dbs {
batch := NewUpdateBatch()
batch := db.NewUpdateBatch()
batch.Put([]byte("key1"), []byte("value1"))
batch.Put([]byte("key2"), []byte("value2"))
batch.Put([]byte("key3"), []byte("value3"))
db.WriteBatch(batch, true)
}

for _, db := range dbs {
batch := NewUpdateBatch()
batch := db.NewUpdateBatch()
batch.Delete([]byte("key2"))
db.WriteBatch(batch, true)
}
Expand Down
4 changes: 2 additions & 2 deletions core/ledger/confighistory/db_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func newDBProvider(dbPath string) (*dbProvider, error) {
return &dbProvider{Provider: p}, nil
}

func newBatch() *batch {
return &batch{leveldbhelper.NewUpdateBatch()}
func (d *db) newBatch() *batch {
return &batch{d.DBHandle.NewUpdateBatch()}
}

func (p *dbProvider) getDB(id string) *db {
Expand Down
2 changes: 1 addition & 1 deletion core/ledger/confighistory/db_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func verifyNsEntries(t *testing.T, nsItr *leveldbhelper.Iterator, expectedEntrie
require.Equal(t, expectedEntries, retrievedEntries)
}
func populateDBWithSampleData(t *testing.T, db *db, sampledata []*compositeKV) {
batch := newBatch()
batch := db.newBatch()
for _, data := range sampledata {
batch.add(data.ns, data.key, data.blockNum, data.value)
}
Expand Down
17 changes: 8 additions & 9 deletions core/ledger/confighistory/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/snapshot"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/core/ledger"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -100,11 +99,12 @@ func (m *Mgr) HandleStateUpdates(trigger *ledger.StateUpdateTrigger) error {
if len(updatedCollConfigs) == 0 {
return nil
}
batch, err := prepareDBBatch(updatedCollConfigs, trigger.CommittingBlockNum)
dbHandle := m.dbProvider.getDB(trigger.LedgerID)
batch := dbHandle.newBatch()
err = prepareDBBatch(batch, updatedCollConfigs, trigger.CommittingBlockNum)
if err != nil {
return err
}
dbHandle := m.dbProvider.getDB(trigger.LedgerID)
return dbHandle.writeBatch(batch, true)
}

Expand Down Expand Up @@ -135,7 +135,7 @@ func (m *Mgr) ImportConfigHistory(ledgerID string, dir string) error {
return err
}

batch := leveldbhelper.NewUpdateBatch()
batch := db.NewUpdateBatch()
currentBatchSize := 0
for i := uint64(0); i < numCollectionConfigs; i++ {
key, err := collectionConfigData.DecodeBytes()
Expand All @@ -152,7 +152,7 @@ func (m *Mgr) ImportConfigHistory(ledgerID string, dir string) error {
if err := db.WriteBatch(batch, true); err != nil {
return err
}
batch = leveldbhelper.NewUpdateBatch()
batch = db.NewUpdateBatch()
}
}
return db.WriteBatch(batch, true)
Expand Down Expand Up @@ -294,18 +294,17 @@ func (r *Retriever) getImplicitCollection(chaincodeName string) ([]*peer.StaticC
return r.deployedCCInfoProvider.ImplicitCollections(r.ledgerID, chaincodeName, qe)
}

func prepareDBBatch(chaincodeCollConfigs map[string]*peer.CollectionConfigPackage, committingBlockNum uint64) (*batch, error) {
batch := newBatch()
func prepareDBBatch(batch *batch, chaincodeCollConfigs map[string]*peer.CollectionConfigPackage, committingBlockNum uint64) error {
for ccName, collConfig := range chaincodeCollConfigs {
key := constructCollectionConfigKey(ccName)
var configBytes []byte
var err error
if configBytes, err = proto.Marshal(collConfig); err != nil {
return nil, errors.WithStack(err)
return errors.WithStack(err)
}
batch.add(collectionConfigNamespace, key, committingBlockNum, configBytes)
}
return batch, nil
return nil
}

func compositeKVToCollectionConfig(compositeKV *compositeKV) (*ledger.CollectionConfigInfo, error) {
Expand Down
19 changes: 13 additions & 6 deletions core/ledger/confighistory/mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,16 +195,17 @@ func TestWithImplicitColls(t *testing.T) {
ccInfoProvider: mockCCInfoProvider,
dbProvider: p,
}

dbHandle := mgr.dbProvider.getDB("ledger1")
batch := dbHandle.newBatch()
// add explicit collections at height 20
batch, err := prepareDBBatch(
err = prepareDBBatch(
batch,
map[string]*peer.CollectionConfigPackage{
"chaincode1": collConfigPackage,
},
20,
)
require.NoError(t, err)
dbHandle := mgr.dbProvider.getDB("ledger1")
require.NoError(t, dbHandle.writeBatch(batch, true))

onlyImplicitCollections := testutilCreateCollConfigPkg(
Expand Down Expand Up @@ -342,7 +343,9 @@ func TestExportAndImportConfigHistory(t *testing.T) {
}

db := env.mgr.dbProvider.getDB(ledgerID)
batch, err := prepareDBBatch(
batch := db.newBatch()
err := prepareDBBatch(
batch,
map[string]*peer.CollectionConfigPackage{
"chaincode1": cc1CollConfigPackage,
"chaincode2": cc2CollConfigPackage,
Expand All @@ -353,7 +356,9 @@ func TestExportAndImportConfigHistory(t *testing.T) {
require.NoError(t, err)
require.NoError(t, db.writeBatch(batch, true))

batch, err = prepareDBBatch(
batch = db.newBatch()
err = prepareDBBatch(
batch,
map[string]*peer.CollectionConfigPackage{
"chaincode1": cc1CollConfigPackageNew,
"chaincode2": cc2CollConfigPackageNew,
Expand Down Expand Up @@ -555,7 +560,9 @@ func TestExportConfigHistoryErrorCase(t *testing.T) {

db := env.mgr.dbProvider.getDB("ledger1")
cc1collConfigPackage := testutilCreateCollConfigPkg([]string{"Explicit-cc1-coll-1", "Explicit-cc1-coll-2"})
batch, err := prepareDBBatch(
batch := db.newBatch()
err := prepareDBBatch(
batch,
map[string]*peer.CollectionConfigPackage{
"chaincode1": cc1collConfigPackage,
},
Expand Down
2 changes: 1 addition & 1 deletion core/ledger/kvledger/history/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (d *DB) Commit(block *common.Block) error {
//Set the starting tranNo to 0
var tranNo uint64

dbBatch := leveldbhelper.NewUpdateBatch()
dbBatch := d.levelDB.NewUpdateBatch()

logger.Debugf("Channel [%s]: Updating history database for blockNo [%v] with [%d] transactions",
d.name, blockNo, len(block.Data.Data))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (h *metadataHint) metadataEverUsedFor(namespace string) bool {
}

func (h *metadataHint) setMetadataUsedFlag(updates *UpdateBatch) {
batch := leveldbhelper.NewUpdateBatch()
batch := h.bookkeeper.NewUpdateBatch()
for ns := range filterNamespacesThatHasMetadata(updates) {
if h.cache[ns] {
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func newExpiryKeeper(ledgerid string, provider bookkeeping.Provider) *expiryKeep
// at the time of the commit of the block number 45 and the second entry was created at the time of the commit of the block number 40, however
// both are expiring with the commit of block number 50.
func (ek *expiryKeeper) update(toTrack []*expiryInfo, toClear []*expiryInfoKey) error {
updateBatch := leveldbhelper.NewUpdateBatch()
updateBatch := ek.db.NewUpdateBatch()
for _, expinfo := range toTrack {
k, v, err := encodeKV(expinfo)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (vdb *versionedDB) ExecuteQueryWithPagination(namespace, query, bookmark st

// ApplyUpdates implements method in VersionedDB interface
func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error {
dbBatch := leveldbhelper.NewUpdateBatch()
dbBatch := vdb.db.NewUpdateBatch()
namespaces := batch.GetUpdatedNamespaces()
for _, ns := range namespaces {
updates := batch.GetUpdates(ns)
Expand Down
20 changes: 10 additions & 10 deletions core/ledger/pvtdatastorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (s *Store) initState() error {

if batchPending {
committingBlockNum := s.nextBlockNum()
batch := leveldbhelper.NewUpdateBatch()
batch := s.db.NewUpdateBatch()
batch.Put(lastCommittedBlkkey, encodeLastCommittedBlockVal(committingBlockNum))
batch.Delete(pendingCommitKey)
if err := s.db.WriteBatch(batch, true); err != nil {
Expand Down Expand Up @@ -214,7 +214,7 @@ func (s *Store) Commit(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvtD
return &ErrIllegalArgs{fmt.Sprintf("Expected block number=%d, received block number=%d", expectedBlockNum, blockNum)}
}

batch := leveldbhelper.NewUpdateBatch()
batch := s.db.NewUpdateBatch()
var err error
var keyBytes, valBytes []byte

Expand Down Expand Up @@ -288,7 +288,7 @@ func (s *Store) CommitPvtDataOfOldBlocks(blocksPvtData map[uint64][]*ledger.TxPv

// (3) create a db update batch from the update entries
logger.Debug("Constructing update batch from pvtdatastore entries")
batch, err := constructUpdateBatchFromUpdateEntries(updateEntries)
batch, err := s.constructUpdateBatchFromUpdateEntries(updateEntries)
if err != nil {
return err
}
Expand Down Expand Up @@ -427,8 +427,8 @@ func (updateEntries *entriesForPvtDataOfOldBlocks) updateAndAddMissingDataEntry(
updateEntries.missingDataEntries[nsCollBlk] = missingData
}

func constructUpdateBatchFromUpdateEntries(updateEntries *entriesForPvtDataOfOldBlocks) (*leveldbhelper.UpdateBatch, error) {
batch := leveldbhelper.NewUpdateBatch()
func (s *Store) constructUpdateBatchFromUpdateEntries(updateEntries *entriesForPvtDataOfOldBlocks) (*leveldbhelper.UpdateBatch, error) {
batch := s.db.NewUpdateBatch()

// add the following four types of entries to the update batch: (1) new data entries
// (i.e., pvtData), (2) updated expiry entries, (3) updated missing data entries, and
Expand Down Expand Up @@ -563,7 +563,7 @@ func (s *Store) getLastUpdatedOldBlocksList() ([]uint64, error) {

// ResetLastUpdatedOldBlocksList removes the `lastUpdatedOldBlocksList` entry from the store
func (s *Store) ResetLastUpdatedOldBlocksList() error {
batch := leveldbhelper.NewUpdateBatch()
batch := s.db.NewUpdateBatch()
batch.Delete(lastUpdatedOldBlocksKey)
if err := s.db.WriteBatch(batch, true); err != nil {
return err
Expand Down Expand Up @@ -734,7 +734,7 @@ func (s *Store) ProcessCollsEligibilityEnabled(committingBlk uint64, nsCollMap m
if err != nil {
return err
}
batch := leveldbhelper.NewUpdateBatch()
batch := s.db.NewUpdateBatch()
batch.Put(key, val)
if err = s.db.WriteBatch(batch, true); err != nil {
return err
Expand All @@ -760,7 +760,7 @@ func (s *Store) performPurgeIfScheduled(latestCommittedBlk uint64) {
}

func (s *Store) purgeExpiredData(minBlkNum, maxBlkNum uint64) error {
batch := leveldbhelper.NewUpdateBatch()
batch := s.db.NewUpdateBatch()
expiryEntries, err := s.retrieveExpiryEntries(minBlkNum, maxBlkNum)
if err != nil || len(expiryEntries) == 0 {
return err
Expand Down Expand Up @@ -836,7 +836,7 @@ func (s *Store) processCollElgEvents() error {
return err
}
defer eventItr.Release()
batch := leveldbhelper.NewUpdateBatch()
batch := s.db.NewUpdateBatch()
totalEntriesConverted := 0

for eventItr.Next() {
Expand Down Expand Up @@ -870,7 +870,7 @@ func (s *Store) processCollElgEvents() error {
collEntriesConverted++
if batch.Len() > s.maxBatchSize {
s.db.WriteBatch(batch, true)
batch = leveldbhelper.NewUpdateBatch()
batch = s.db.NewUpdateBatch()
sleepTime := time.Duration(s.batchesInterval)
logger.Infof("Going to sleep for %d milliseconds between batches. Entries for [ns=%s, coll=%s] converted so far = %d",
sleepTime, ns, coll, collEntriesConverted)
Expand Down
4 changes: 2 additions & 2 deletions core/ledger/pvtdatastorage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ func TestPendingBatch(t *testing.T) {
require := require.New(t)
s := env.TestStore
existingLastBlockNum := uint64(25)
batch := leveldbhelper.NewUpdateBatch()
batch := s.db.NewUpdateBatch()
batch.Put(lastCommittedBlkkey, encodeLastCommittedBlockVal(existingLastBlockNum))
require.NoError(s.db.WriteBatch(batch, true))
s.lastCommittedBlock = existingLastBlockNum
Expand All @@ -651,7 +651,7 @@ func TestPendingBatch(t *testing.T) {
// assume that a block has been prepared in v142 and the peer was
// killed for upgrade. When the pvtdataStore is opened again with
// v2.0 peer, the pendingBatch should be marked as committed.
batch = leveldbhelper.NewUpdateBatch()
batch = s.db.NewUpdateBatch()

// store pvtData entries
dataKey := &dataKey{nsCollBlk{"ns-1", "coll-1", 26}, 1}
Expand Down
Loading

0 comments on commit bebe131

Please sign in to comment.