Skip to content

Commit

Permalink
Ledger block cache
Browse files Browse the repository at this point in the history
This commit introduces an in-memory cache for the block storage of the ledger.
It caches new blocks that are committed and assumes blocks are committed in-order and with consecutive sequences.

The block iterators now attempt to retrieve the blocks from the cache if possible before going to the block storage.

The intent is twofold:
1) Speedup the block Deliver API by not doing disk I/O when clients (peers, orderers) fetch blocks.
2) Reduce the impact of the deliver API from writing new blocks into the ledger.

Signed-off-by: Yacov Manevich <yacov.manevich@ibm.com>
  • Loading branch information
yacovm authored and manish-sethi committed Sep 2, 2023
1 parent a8e078f commit 45cefb2
Show file tree
Hide file tree
Showing 4 changed files with 303 additions and 2 deletions.
6 changes: 5 additions & 1 deletion common/ledger/blkstorage/blockfile_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
blockfilePrefix = "blockfile_"
bootstrappingSnapshotInfoFile = "bootstrappingSnapshot.info"
bootstrappingSnapshotInfoTempFile = "bootstrappingSnapshotTemp.info"
defaultBlockCacheSizeBytes = 1024 * 1024 * 50
)

var blkMgrInfoKey = []byte("blkMgrInfo")
Expand All @@ -41,6 +42,7 @@ type blockfileMgr struct {
blkfilesInfoCond *sync.Cond
currentFileWriter *blockfileWriter
bcInfo atomic.Value
cache *cache
}

/*
Expand Down Expand Up @@ -96,7 +98,7 @@ func newBlockfileMgr(id string, conf *Conf, indexConfig *IndexConfig, indexStore
if err != nil {
panic(fmt.Sprintf("Error creating block storage root dir [%s]: %s", rootDir, err))
}
mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: indexStore}
mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: indexStore, cache: newCache(defaultMaxBlockfileSize)}

blockfilesInfo, err := mgr.loadBlkfilesInfo()
if err != nil {
Expand Down Expand Up @@ -328,6 +330,8 @@ func (mgr *blockfileMgr) addBlock(block *common.Block) error {
return errors.WithMessage(err, "error appending block to file")
}

defer mgr.cache.put(block, blockBytesLen)

// Update the blockfilesInfo with the results of adding the new block
currentBlkfilesInfo := mgr.blockfilesInfo
newBlkfilesInfo := &blockfilesInfo{
Expand Down
23 changes: 22 additions & 1 deletion common/ledger/blkstorage/blocks_itr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ type blocksItr struct {
stream *blockStream
closeMarker bool
closeMarkerLock *sync.Mutex
cachedPrev bool
}

func newBlockItr(mgr *blockfileMgr, startBlockNum uint64) *blocksItr {
mgr.blkfilesInfoCond.L.Lock()
defer mgr.blkfilesInfoCond.L.Unlock()
return &blocksItr{mgr, mgr.blockfilesInfo.lastPersistedBlock, startBlockNum, nil, false, &sync.Mutex{}}
return &blocksItr{mgr, mgr.blockfilesInfo.lastPersistedBlock, startBlockNum, nil, false, &sync.Mutex{}, false}
}

func (itr *blocksItr) waitForBlock(blockNum uint64) uint64 {
Expand Down Expand Up @@ -68,12 +69,32 @@ func (itr *blocksItr) Next() (ledger.QueryResult, error) {
if itr.closeMarker {
return nil, nil
}

cachedBlock, existsInCache := itr.mgr.cache.get(itr.blockNumToRetrieve)
if existsInCache {
logger.Debugf("Retrieved block %d from ledger in-memory cache", itr.blockNumToRetrieve)
itr.cachedPrev = true
itr.blockNumToRetrieve++
return cachedBlock, nil
}

if itr.cachedPrev {
itr.cachedPrev = false
if itr.stream != nil {
itr.stream.close()
itr.stream = nil
}
}

if itr.stream == nil {
logger.Debugf("Initializing block stream for iterator. itr.maxBlockNumAvailable=%d", itr.maxBlockNumAvailable)
if err := itr.initStream(); err != nil {
return nil, err
}
}

defer logger.Debugf("Retrieved block %d from ledger file storage", itr.blockNumToRetrieve)

nextBlockBytes, err := itr.stream.nextBlockBytes()
if err != nil {
return nil, err
Expand Down
105 changes: 105 additions & 0 deletions common/ledger/blkstorage/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package blkstorage

import (
"fmt"
"sync"

"github.com/hyperledger/fabric-protos-go/common"
)

const (
estimatedBlockSize = 512 * 1024
)

type cache struct {
cacheLock sync.RWMutex
disabled bool
cache map[uint64]cachedBlock
sizeBytes int
maxSeq uint64
maxSizeBytes int
}

type cachedBlock struct {
block *common.Block
blockSize int
}

func newCache(maxSizeBytes int) *cache {
isCacheDisabled := maxSizeBytes == 0

return &cache{
disabled: isCacheDisabled,
cache: make(map[uint64]cachedBlock, maxSizeBytes/estimatedBlockSize),
maxSizeBytes: maxSizeBytes,
}
}

func (c *cache) get(seq uint64) (*common.Block, bool) {
if c.disabled {
return nil, false
}

c.cacheLock.RLock()
defer c.cacheLock.RUnlock()

cachedBlock, exists := c.cache[seq]
return cachedBlock.block, exists
}

func (c *cache) put(block *common.Block, blockSize int) {
if c.disabled {
return
}

seq := block.Header.Number

if c.maxSeq > seq {
return
}

if c.maxSeq+1 < seq && c.maxSeq != 0 {
panic(fmt.Sprintf("detected out of order block insertion: attempted to insert block number %d but highest block is %d",
seq, c.maxSeq))
}

if c.maxSeq == seq && c.maxSeq != 0 {
panic(fmt.Sprintf("detected insertion of the same block (%d) twice", seq))
}

// Insert the block to the cache
c.maxSeq = seq

c.cacheLock.Lock()
defer c.cacheLock.Unlock()

c.sizeBytes += blockSize

c.cache[seq] = cachedBlock{block: block, blockSize: blockSize}

// If our cache is too big, evict the oldest block
for c.sizeBytes > c.maxSizeBytes {
c.evictOldestCachedBlock()
}
}

func (c *cache) evictOldestCachedBlock() {
cachedItemCount := len(c.cache)

// Given a series of k > 0 consecutive elements: {i, i+1, i+2, ... , i+k-1}
// If the max sequence is j then j=i+k-1, and then the lowest element i is j-k+1
evictedIndex := c.maxSeq - uint64(cachedItemCount) + 1
evictedBlock, exists := c.cache[evictedIndex]
if !exists {
panic(fmt.Sprintf("programming error: last stored block sequence is %d and cached block count"+
" is %d but index to be evicted %d was not found", c.maxSeq, cachedItemCount, evictedIndex))
}
delete(c.cache, evictedIndex) // Delete minimum entry
c.sizeBytes -= evictedBlock.blockSize
}
171 changes: 171 additions & 0 deletions common/ledger/blkstorage/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package blkstorage

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/stretchr/testify/require"
)

func TestCacheDisabled(t *testing.T) {
c := newCache(0)

c.put(&common.Block{Header: &common.BlockHeader{}}, 0)
block, exists := c.get(0)

assertNotCached(t, exists, block)
}

func TestNotCachingTooSmallEntry(t *testing.T) {
c := newCache(10)

for i := 100; i < 105; i++ {
block := &common.Block{
Header: &common.BlockHeader{Number: uint64(i)},
}
c.put(block, 1)
}

c.put(&common.Block{Header: &common.BlockHeader{Number: 99}}, 1)
block, exists := c.get(99)

assertNotCached(t, exists, block)
}

func TestTooBigEntryNotCached(t *testing.T) {
c := newCache(10)

c.put(&common.Block{Header: &common.BlockHeader{Number: 100}}, 11)
block, exists := c.get(100)

assertNotCached(t, exists, block)
}

func TestOutOfOrderInsertionPanics(t *testing.T) {
c := newCache(10)

c.put(&common.Block{Header: &common.BlockHeader{Number: 100}}, 1)
block, exists := c.get(100)

assertCached(t, exists, block, 100)

func() {
defer func() {
err := recover()
assert.Contains(t, err.(string), "detected out of order block insertion: attempted to insert block number 102 but highest block is 100")
}()

c.put(&common.Block{Header: &common.BlockHeader{Number: 102}}, 1)
}()
}

func TestDoubleInsertionPanics(t *testing.T) {
c := newCache(10)

c.put(&common.Block{Header: &common.BlockHeader{Number: 100}}, 1)
block, exists := c.get(100)

assertCached(t, exists, block, 100)

func() {
defer func() {
err := recover()
assert.Contains(t, err.(string), "detected insertion of the same block (100) twice")
}()

c.put(&common.Block{Header: &common.BlockHeader{Number: 100}}, 1)
}()
}

func TestTooBigEntryEvictsSmallerEntries(t *testing.T) {
c := newCache(10)

c.put(&common.Block{Header: &common.BlockHeader{Number: 1}}, 1)
c.put(&common.Block{Header: &common.BlockHeader{Number: 2}}, 1)
c.put(&common.Block{Header: &common.BlockHeader{Number: 3}}, 1)

block, exists := c.get(1)
assertCached(t, exists, block, 1)

block, exists = c.get(2)
assertCached(t, exists, block, 2)

block, exists = c.get(3)
assertCached(t, exists, block, 3)

c.put(&common.Block{Header: &common.BlockHeader{Number: 4}}, 10)

block, exists = c.get(1)
assertNotCached(t, exists, block)

block, exists = c.get(2)
assertNotCached(t, exists, block)

block, exists = c.get(3)
assertNotCached(t, exists, block)

block, exists = c.get(4)
assertCached(t, exists, block, 4)
}

func TestCacheEviction(t *testing.T) {
c := newCache(10)

for i := 0; i < 10; i++ {
block := &common.Block{
Header: &common.BlockHeader{Number: uint64(i)},
}
c.put(block, 1)
}

for i := 10; i < 20; i++ {
// Ensure items 11 blocks in the past are not cached, but evicted
if uint64(i) > 10 {
block, exists := c.get(uint64(i) - 11)
assertNotCached(t, exists, block)
}
// Ensure items 10 blocks in the past are still cached
block, exists := c.get(uint64(i) - 10)
assertCached(t, exists, block, uint64(i)-10)

block = &common.Block{
Header: &common.BlockHeader{Number: uint64(i)},
}
c.put(block, 1)
}

block, exists := c.get(9)
assertNotCached(t, exists, block)

for i := 10; i < 20; i++ {
block, exists := c.get(uint64(i))
assertCached(t, exists, block, uint64(i))
}
}

func assertNotCached(t *testing.T, exists bool, block *common.Block) {
assertWasCached(t, exists, block, 0, false)
}

func assertCached(t *testing.T, exists bool, block *common.Block, expectedSeq uint64) {
assertWasCached(t, exists, block, expectedSeq, true)
}

func assertWasCached(t *testing.T, exists bool, block *common.Block, expectedSeq uint64, expectedExists bool) {
if !expectedExists {
require.False(t, exists)
require.Nil(t, block)
return
}
require.True(t, exists)
require.NotNil(t, block)
require.Equal(t, expectedSeq, block.Header.Number)
}

0 comments on commit 45cefb2

Please sign in to comment.