From 45cefb29ecc4a375fe67b15a3d5eb5c9f4231d9c Mon Sep 17 00:00:00 2001 From: Yacov Manevich Date: Wed, 28 Jun 2023 21:41:08 +0200 Subject: [PATCH] Ledger block cache This commit introduces an in-memory cache for the block storage of the ledger. It caches new blocks that are committed and assumes blocks are committed in-order and with consecutive sequences. The block iterators now attempt to retrieve the blocks from the cache if possible before going to the block storage. The intent is twofold: 1) Speedup the block Deliver API by not doing disk I/O when clients (peers, orderers) fetch blocks. 2) Reduce the impact of the deliver API from writing new blocks into the ledger. Signed-off-by: Yacov Manevich --- common/ledger/blkstorage/blockfile_mgr.go | 6 +- common/ledger/blkstorage/blocks_itr.go | 23 ++- common/ledger/blkstorage/cache.go | 105 +++++++++++++ common/ledger/blkstorage/cache_test.go | 171 ++++++++++++++++++++++ 4 files changed, 303 insertions(+), 2 deletions(-) create mode 100644 common/ledger/blkstorage/cache.go create mode 100644 common/ledger/blkstorage/cache_test.go diff --git a/common/ledger/blkstorage/blockfile_mgr.go b/common/ledger/blkstorage/blockfile_mgr.go index ad9295cbc2a..5384b974bd8 100644 --- a/common/ledger/blkstorage/blockfile_mgr.go +++ b/common/ledger/blkstorage/blockfile_mgr.go @@ -27,6 +27,7 @@ const ( blockfilePrefix = "blockfile_" bootstrappingSnapshotInfoFile = "bootstrappingSnapshot.info" bootstrappingSnapshotInfoTempFile = "bootstrappingSnapshotTemp.info" + defaultBlockCacheSizeBytes = 1024 * 1024 * 50 ) var blkMgrInfoKey = []byte("blkMgrInfo") @@ -41,6 +42,7 @@ type blockfileMgr struct { blkfilesInfoCond *sync.Cond currentFileWriter *blockfileWriter bcInfo atomic.Value + cache *cache } /* @@ -96,7 +98,7 @@ func newBlockfileMgr(id string, conf *Conf, indexConfig *IndexConfig, indexStore if err != nil { panic(fmt.Sprintf("Error creating block storage root dir [%s]: %s", rootDir, err)) } - mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: indexStore} + mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: indexStore, cache: newCache(defaultMaxBlockfileSize)} blockfilesInfo, err := mgr.loadBlkfilesInfo() if err != nil { @@ -328,6 +330,8 @@ func (mgr *blockfileMgr) addBlock(block *common.Block) error { return errors.WithMessage(err, "error appending block to file") } + defer mgr.cache.put(block, blockBytesLen) + // Update the blockfilesInfo with the results of adding the new block currentBlkfilesInfo := mgr.blockfilesInfo newBlkfilesInfo := &blockfilesInfo{ diff --git a/common/ledger/blkstorage/blocks_itr.go b/common/ledger/blkstorage/blocks_itr.go index b551e6572d1..17e5a7c648e 100644 --- a/common/ledger/blkstorage/blocks_itr.go +++ b/common/ledger/blkstorage/blocks_itr.go @@ -20,12 +20,13 @@ type blocksItr struct { stream *blockStream closeMarker bool closeMarkerLock *sync.Mutex + cachedPrev bool } func newBlockItr(mgr *blockfileMgr, startBlockNum uint64) *blocksItr { mgr.blkfilesInfoCond.L.Lock() defer mgr.blkfilesInfoCond.L.Unlock() - return &blocksItr{mgr, mgr.blockfilesInfo.lastPersistedBlock, startBlockNum, nil, false, &sync.Mutex{}} + return &blocksItr{mgr, mgr.blockfilesInfo.lastPersistedBlock, startBlockNum, nil, false, &sync.Mutex{}, false} } func (itr *blocksItr) waitForBlock(blockNum uint64) uint64 { @@ -68,12 +69,32 @@ func (itr *blocksItr) Next() (ledger.QueryResult, error) { if itr.closeMarker { return nil, nil } + + cachedBlock, existsInCache := itr.mgr.cache.get(itr.blockNumToRetrieve) + if existsInCache { + logger.Debugf("Retrieved block %d from ledger in-memory cache", itr.blockNumToRetrieve) + itr.cachedPrev = true + itr.blockNumToRetrieve++ + return cachedBlock, nil + } + + if itr.cachedPrev { + itr.cachedPrev = false + if itr.stream != nil { + itr.stream.close() + itr.stream = nil + } + } + if itr.stream == nil { logger.Debugf("Initializing block stream for iterator. itr.maxBlockNumAvailable=%d", itr.maxBlockNumAvailable) if err := itr.initStream(); err != nil { return nil, err } } + + defer logger.Debugf("Retrieved block %d from ledger file storage", itr.blockNumToRetrieve) + nextBlockBytes, err := itr.stream.nextBlockBytes() if err != nil { return nil, err diff --git a/common/ledger/blkstorage/cache.go b/common/ledger/blkstorage/cache.go new file mode 100644 index 00000000000..35c40c549bb --- /dev/null +++ b/common/ledger/blkstorage/cache.go @@ -0,0 +1,105 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package blkstorage + +import ( + "fmt" + "sync" + + "github.com/hyperledger/fabric-protos-go/common" +) + +const ( + estimatedBlockSize = 512 * 1024 +) + +type cache struct { + cacheLock sync.RWMutex + disabled bool + cache map[uint64]cachedBlock + sizeBytes int + maxSeq uint64 + maxSizeBytes int +} + +type cachedBlock struct { + block *common.Block + blockSize int +} + +func newCache(maxSizeBytes int) *cache { + isCacheDisabled := maxSizeBytes == 0 + + return &cache{ + disabled: isCacheDisabled, + cache: make(map[uint64]cachedBlock, maxSizeBytes/estimatedBlockSize), + maxSizeBytes: maxSizeBytes, + } +} + +func (c *cache) get(seq uint64) (*common.Block, bool) { + if c.disabled { + return nil, false + } + + c.cacheLock.RLock() + defer c.cacheLock.RUnlock() + + cachedBlock, exists := c.cache[seq] + return cachedBlock.block, exists +} + +func (c *cache) put(block *common.Block, blockSize int) { + if c.disabled { + return + } + + seq := block.Header.Number + + if c.maxSeq > seq { + return + } + + if c.maxSeq+1 < seq && c.maxSeq != 0 { + panic(fmt.Sprintf("detected out of order block insertion: attempted to insert block number %d but highest block is %d", + seq, c.maxSeq)) + } + + if c.maxSeq == seq && c.maxSeq != 0 { + panic(fmt.Sprintf("detected insertion of the same block (%d) twice", seq)) + } + + // Insert the block to the cache + c.maxSeq = seq + + c.cacheLock.Lock() + defer c.cacheLock.Unlock() + + c.sizeBytes += blockSize + + c.cache[seq] = cachedBlock{block: block, blockSize: blockSize} + + // If our cache is too big, evict the oldest block + for c.sizeBytes > c.maxSizeBytes { + c.evictOldestCachedBlock() + } +} + +func (c *cache) evictOldestCachedBlock() { + cachedItemCount := len(c.cache) + + // Given a series of k > 0 consecutive elements: {i, i+1, i+2, ... , i+k-1} + // If the max sequence is j then j=i+k-1, and then the lowest element i is j-k+1 + evictedIndex := c.maxSeq - uint64(cachedItemCount) + 1 + evictedBlock, exists := c.cache[evictedIndex] + if !exists { + panic(fmt.Sprintf("programming error: last stored block sequence is %d and cached block count"+ + " is %d but index to be evicted %d was not found", c.maxSeq, cachedItemCount, evictedIndex)) + } + delete(c.cache, evictedIndex) // Delete minimum entry + c.sizeBytes -= evictedBlock.blockSize +} diff --git a/common/ledger/blkstorage/cache_test.go b/common/ledger/blkstorage/cache_test.go new file mode 100644 index 00000000000..78e8a7f12be --- /dev/null +++ b/common/ledger/blkstorage/cache_test.go @@ -0,0 +1,171 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package blkstorage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/stretchr/testify/require" +) + +func TestCacheDisabled(t *testing.T) { + c := newCache(0) + + c.put(&common.Block{Header: &common.BlockHeader{}}, 0) + block, exists := c.get(0) + + assertNotCached(t, exists, block) +} + +func TestNotCachingTooSmallEntry(t *testing.T) { + c := newCache(10) + + for i := 100; i < 105; i++ { + block := &common.Block{ + Header: &common.BlockHeader{Number: uint64(i)}, + } + c.put(block, 1) + } + + c.put(&common.Block{Header: &common.BlockHeader{Number: 99}}, 1) + block, exists := c.get(99) + + assertNotCached(t, exists, block) +} + +func TestTooBigEntryNotCached(t *testing.T) { + c := newCache(10) + + c.put(&common.Block{Header: &common.BlockHeader{Number: 100}}, 11) + block, exists := c.get(100) + + assertNotCached(t, exists, block) +} + +func TestOutOfOrderInsertionPanics(t *testing.T) { + c := newCache(10) + + c.put(&common.Block{Header: &common.BlockHeader{Number: 100}}, 1) + block, exists := c.get(100) + + assertCached(t, exists, block, 100) + + func() { + defer func() { + err := recover() + assert.Contains(t, err.(string), "detected out of order block insertion: attempted to insert block number 102 but highest block is 100") + }() + + c.put(&common.Block{Header: &common.BlockHeader{Number: 102}}, 1) + }() +} + +func TestDoubleInsertionPanics(t *testing.T) { + c := newCache(10) + + c.put(&common.Block{Header: &common.BlockHeader{Number: 100}}, 1) + block, exists := c.get(100) + + assertCached(t, exists, block, 100) + + func() { + defer func() { + err := recover() + assert.Contains(t, err.(string), "detected insertion of the same block (100) twice") + }() + + c.put(&common.Block{Header: &common.BlockHeader{Number: 100}}, 1) + }() +} + +func TestTooBigEntryEvictsSmallerEntries(t *testing.T) { + c := newCache(10) + + c.put(&common.Block{Header: &common.BlockHeader{Number: 1}}, 1) + c.put(&common.Block{Header: &common.BlockHeader{Number: 2}}, 1) + c.put(&common.Block{Header: &common.BlockHeader{Number: 3}}, 1) + + block, exists := c.get(1) + assertCached(t, exists, block, 1) + + block, exists = c.get(2) + assertCached(t, exists, block, 2) + + block, exists = c.get(3) + assertCached(t, exists, block, 3) + + c.put(&common.Block{Header: &common.BlockHeader{Number: 4}}, 10) + + block, exists = c.get(1) + assertNotCached(t, exists, block) + + block, exists = c.get(2) + assertNotCached(t, exists, block) + + block, exists = c.get(3) + assertNotCached(t, exists, block) + + block, exists = c.get(4) + assertCached(t, exists, block, 4) +} + +func TestCacheEviction(t *testing.T) { + c := newCache(10) + + for i := 0; i < 10; i++ { + block := &common.Block{ + Header: &common.BlockHeader{Number: uint64(i)}, + } + c.put(block, 1) + } + + for i := 10; i < 20; i++ { + // Ensure items 11 blocks in the past are not cached, but evicted + if uint64(i) > 10 { + block, exists := c.get(uint64(i) - 11) + assertNotCached(t, exists, block) + } + // Ensure items 10 blocks in the past are still cached + block, exists := c.get(uint64(i) - 10) + assertCached(t, exists, block, uint64(i)-10) + + block = &common.Block{ + Header: &common.BlockHeader{Number: uint64(i)}, + } + c.put(block, 1) + } + + block, exists := c.get(9) + assertNotCached(t, exists, block) + + for i := 10; i < 20; i++ { + block, exists := c.get(uint64(i)) + assertCached(t, exists, block, uint64(i)) + } +} + +func assertNotCached(t *testing.T, exists bool, block *common.Block) { + assertWasCached(t, exists, block, 0, false) +} + +func assertCached(t *testing.T, exists bool, block *common.Block, expectedSeq uint64) { + assertWasCached(t, exists, block, expectedSeq, true) +} + +func assertWasCached(t *testing.T, exists bool, block *common.Block, expectedSeq uint64, expectedExists bool) { + if !expectedExists { + require.False(t, exists) + require.Nil(t, block) + return + } + require.True(t, exists) + require.NotNil(t, block) + require.Equal(t, expectedSeq, block.Header.Number) +}