Skip to content

Commit

Permalink
core: remove notion of future blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed Mar 5, 2024
1 parent a31c044 commit 05c256f
Showing 1 changed file with 5 additions and 103 deletions.
108 changes: 5 additions & 103 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"github.com/ethereum/go-ethereum/triedb"
"github.com/ethereum/go-ethereum/triedb/hashdb"
"github.com/ethereum/go-ethereum/triedb/pathdb"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -97,13 +96,11 @@ var (
)

const (
bodyCacheLimit = 256
blockCacheLimit = 256
receiptsCacheLimit = 32
txLookupCacheLimit = 1024
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
TriesInMemory = 128
bodyCacheLimit = 256
blockCacheLimit = 256
receiptsCacheLimit = 32
txLookupCacheLimit = 1024
TriesInMemory = 128

// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
//
Expand Down Expand Up @@ -245,9 +242,6 @@ type BlockChain struct {
blockCache *lru.Cache[common.Hash, *types.Block]
txLookupCache *lru.Cache[common.Hash, txLookup]

// future blocks are blocks added for later processing
futureBlocks *lru.Cache[common.Hash, *types.Block]

wg sync.WaitGroup
quit chan struct{} // shutdown signal, closed in Stop.
stopping atomic.Bool // false if chain is running, true when stopped
Expand Down Expand Up @@ -299,7 +293,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
engine: engine,
vmConfig: vmConfig,
}
Expand Down Expand Up @@ -449,11 +442,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
}
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Root)
}

// Start future block processor.
bc.wg.Add(1)
go bc.updateFutureBlocks()

// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
Expand Down Expand Up @@ -794,7 +782,6 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
bc.receiptsCache.Purge()
bc.blockCache.Purge()
bc.txLookupCache.Purge()
bc.futureBlocks.Purge()

// Clear safe block, finalized block if needed
if safe := bc.CurrentSafeBlock(); safe != nil && head < safe.Number.Uint64() {
Expand Down Expand Up @@ -1048,24 +1035,6 @@ func (bc *BlockChain) insertStopped() bool {
return bc.procInterrupt.Load()
}

func (bc *BlockChain) procFutureBlocks() {
blocks := make([]*types.Block, 0, bc.futureBlocks.Len())
for _, hash := range bc.futureBlocks.Keys() {
if block, exist := bc.futureBlocks.Peek(hash); exist {
blocks = append(blocks, block)
}
}
if len(blocks) > 0 {
slices.SortFunc(blocks, func(a, b *types.Block) int {
return a.Number().Cmp(b.Number())
})
// Insert one by one as chain insertion needs contiguous ancestry between blocks
for i := range blocks {
bc.InsertChain(blocks[i : i+1])
}
}
}

// WriteStatus status of write
type WriteStatus byte

Expand Down Expand Up @@ -1466,8 +1435,6 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
if status == CanonStatTy {
bc.writeHeadBlock(block)
}
bc.futureBlocks.Remove(block.Hash())

if status == CanonStatTy {
bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
if len(logs) > 0 {
Expand All @@ -1487,25 +1454,6 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
return status, nil
}

// addFutureBlock checks if the block is within the max allowed window to get
// accepted for future processing, and returns an error if the block is too far
// ahead and was not added.
//
// TODO after the transition, the future block shouldn't be kept. Because
// it's not checked in the Geth side anymore.
func (bc *BlockChain) addFutureBlock(block *types.Block) error {
max := uint64(time.Now().Unix() + maxTimeFutureBlocks)
if block.Time() > max {
return fmt.Errorf("future block timestamp %v > allowed %v", block.Time(), max)
}
if block.Difficulty().Cmp(common.Big0) == 0 {
// Never add PoS blocks into the future queue
return nil
}
bc.futureBlocks.Add(block.Hash(), block)
return nil
}

// InsertChain attempts to insert the given batch of blocks in to the canonical
// chain or, otherwise, create a fork. If an error is returned it will return
// the index number of the failing block as well an error describing what went
Expand Down Expand Up @@ -1643,26 +1591,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
_, err := bc.recoverAncestors(block)
return it.index, err
}
// First block is future, shove it (and all children) to the future queue (unknown ancestor)
case errors.Is(err, consensus.ErrFutureBlock) || (errors.Is(err, consensus.ErrUnknownAncestor) && bc.futureBlocks.Contains(it.first().ParentHash())):
for block != nil && (it.index == 0 || errors.Is(err, consensus.ErrUnknownAncestor)) {
log.Debug("Future block, postponing import", "number", block.Number(), "hash", block.Hash())
if err := bc.addFutureBlock(block); err != nil {
return it.index, err
}
block, err = it.next()
}
stats.queued += it.processed()
stats.ignored += it.remaining()

// If there are any still remaining, mark as ignored
return it.index, err

// Some other error(except ErrKnownBlock) occurred, abort.
// ErrKnownBlock is allowed here since some known blocks
// still need re-execution to generate snapshots that are missing
case err != nil && !errors.Is(err, ErrKnownBlock):
bc.futureBlocks.Remove(block.Hash())
stats.ignored += len(it.chain)
bc.reportBlock(block, nil, err)
return it.index, err
Expand Down Expand Up @@ -1867,23 +1799,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
"root", block.Root())
}
}

// Any blocks remaining here? The only ones we care about are the future ones
if block != nil && errors.Is(err, consensus.ErrFutureBlock) {
if err := bc.addFutureBlock(block); err != nil {
return it.index, err
}
block, err = it.next()

for ; block != nil && errors.Is(err, consensus.ErrUnknownAncestor); block, err = it.next() {
if err := bc.addFutureBlock(block); err != nil {
return it.index, err
}
stats.queued++
}
}
stats.ignored += it.remaining()

return it.index, err
}

Expand Down Expand Up @@ -2334,20 +2250,6 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
return head.Hash(), nil
}

func (bc *BlockChain) updateFutureBlocks() {
futureTimer := time.NewTicker(5 * time.Second)
defer futureTimer.Stop()
defer bc.wg.Done()
for {
select {
case <-futureTimer.C:
bc.procFutureBlocks()
case <-bc.quit:
return
}
}
}

// skipBlock returns 'true', if the block being imported can be skipped over, meaning
// that the block does not need to be processed but can be considered already fully 'done'.
func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool {
Expand Down

0 comments on commit 05c256f

Please sign in to comment.