Skip to content

Commit

Permalink
Merge pull request #155 from OffchainLabs/shutdown-markers
Browse files Browse the repository at this point in the history
support non-clean shutdown
  • Loading branch information
PlasmaPower committed Sep 1, 2022
2 parents 0defad1 + 7503143 commit ee7893f
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 29 deletions.
9 changes: 9 additions & 0 deletions arbitrum/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/shutdowncheck"
"github.com/ethereum/go-ethereum/node"
)

Expand All @@ -24,6 +25,8 @@ type Backend struct {
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports

shutdownTracker *shutdowncheck.ShutdownTracker

chanTxs chan *types.Transaction
chanClose chan struct{} //close coroutine
chanNewBlock chan struct{} //create new L2 block unless empty
Expand All @@ -39,6 +42,8 @@ func NewBackend(stack *node.Node, config *Config, chainDb ethdb.Database, publis
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: core.NewBloomIndexer(chainDb, config.BloomBitsBlocks, config.BloomConfirms),

shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb),

chanTxs: make(chan *types.Transaction, 100),
chanClose: make(chan struct{}),
chanNewBlock: make(chan struct{}, 1),
Expand All @@ -49,6 +54,7 @@ func NewBackend(stack *node.Node, config *Config, chainDb ethdb.Database, publis
if err != nil {
return nil, err
}
backend.shutdownTracker.MarkStartup()
return backend, nil
}

Expand Down Expand Up @@ -79,13 +85,16 @@ func (b *Backend) ArbInterface() ArbInterface {
// TODO: this is used when registering backend as lifecycle in stack
func (b *Backend) Start() error {
b.startBloomHandlers(b.config.BloomBitsBlocks)
b.shutdownTracker.Start()

return nil
}

func (b *Backend) Stop() error {
b.scope.Close()
b.bloomIndexer.Close()
b.shutdownTracker.Stop()
b.chainDb.Close()
close(b.chanClose)
return nil
}
76 changes: 50 additions & 26 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"io"
"math"
"math/big"
"sort"
"sync"
Expand Down Expand Up @@ -91,7 +92,6 @@ const (
txLookupCacheLimit = 1024
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
TriesInMemory = 128

// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
//
Expand Down Expand Up @@ -132,12 +132,21 @@ type CacheConfig struct {
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
Preimages bool // Whether to store preimage of trie key to the disk

// Arbitrum: configure GC window
TriesInMemory uint64 // Height difference before which a trie may not be garbage-collected
TrieRetention time.Duration // Time limit before which a trie may not be garbage-collected

SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
}

// defaultCacheConfig are the default caching values if none are specified by the
// user (also used during testing).
var defaultCacheConfig = &CacheConfig{

// Arbitrum Config Options
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down Expand Up @@ -215,6 +224,11 @@ type BlockChain struct {
vmConfig vm.Config
}

type trieGcEntry struct {
Root common.Hash
Timestamp uint64
}

// NewBlockChain returns a fully initialised block chain using information
// available in the database. It initialises the default Ethereum Validator
// and Processor.
Expand Down Expand Up @@ -870,9 +884,15 @@ func (bc *BlockChain) Stop() {
if !bc.cacheConfig.TrieDirtyDisabled {
triedb := bc.stateCache.TrieDB()

for _, offset := range []uint64{0, 1, TriesInMemory - 1} {
for _, offset := range []uint64{0, 1, bc.cacheConfig.TriesInMemory - 1, math.MaxUint64} {
if number := bc.CurrentBlock().NumberU64(); number > offset {
recent := bc.GetBlockByNumber(number - offset)
var recent *types.Block
if offset == math.MaxUint {
_, latest := bc.triegc.Peek()
recent = bc.GetBlockByNumber(uint64(-latest))
} else {
recent = bc.GetBlockByNumber(number - offset)
}
if recent.Root() == (common.Hash{}) {
continue
}
Expand All @@ -890,7 +910,7 @@ func (bc *BlockChain) Stop() {
}
}
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem().(common.Hash))
triedb.Dereference(bc.triegc.PopItem().(trieGcEntry).Root)
}
if size, _ := triedb.Size(); size != 0 {
log.Error("Dangling trie nodes after full cleanup")
Expand Down Expand Up @@ -1213,8 +1233,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return 0, nil
}

var lastWrite uint64

// writeBlockWithoutState writes only the block and its metadata to the database,
// but does not write any state. This is used to construct competing side forks
// up to the point where they exceed the canonical total difficulty.
Expand Down Expand Up @@ -1281,9 +1299,12 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
} else {
// Full but not archive node, do proper garbage collection
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -int64(block.NumberU64()))
bc.triegc.Push(trieGcEntry{root, block.Header().Time}, -int64(block.NumberU64()))

blockLimit := int64(block.NumberU64()) - int64(bc.cacheConfig.TriesInMemory) // only cleared if below that
timeLimit := time.Now().Unix() - int64(bc.cacheConfig.TrieRetention.Seconds()) // only cleared if less than that

if current := block.NumberU64(); current > TriesInMemory {
if blockLimit > 0 && timeLimit > 0 {
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
nodes, imgs = triedb.Size()
Expand All @@ -1292,36 +1313,39 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
if nodes > limit || imgs > 4*1024*1024 {
triedb.Cap(limit - ethdb.IdealBatchSize)
}
// Find the next state trie we need to commit
chosen := current - TriesInMemory

var prevEntry *trieGcEntry
var prevNum uint64
// Garbage collect anything below our required write retention
for !bc.triegc.Empty() {
tmp, number := bc.triegc.Pop()
triegcEntry := tmp.(trieGcEntry)
if uint64(-number) > uint64(blockLimit) || triegcEntry.Timestamp > uint64(timeLimit) {
bc.triegc.Push(triegcEntry, number)
break
}
if prevEntry != nil {
triedb.Dereference(prevEntry.Root)
}
prevEntry = &triegcEntry
prevNum = uint64(-number)
}
// If we exceeded out time allowance, flush an entire trie to disk
if bc.gcproc > bc.cacheConfig.TrieTimeLimit {
if bc.gcproc > bc.cacheConfig.TrieTimeLimit && prevEntry != nil {
// If the header is missing (canonical chain behind), we're reorging a low
// diff sidechain. Suspend committing until this operation is completed.
header := bc.GetHeaderByNumber(chosen)
header := bc.GetHeaderByNumber(prevNum)
if header == nil {
log.Warn("Reorg in progress, trie commit postponed", "number", chosen)
log.Warn("Reorg in progress, trie commit postponed")
} else {
// If we're exceeding limits but haven't reached a large enough memory gap,
// warn the user that the system is becoming unstable.
if chosen < lastWrite+TriesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory)
}
// Flush an entire trie and restart the counters
triedb.Commit(header.Root, true, nil)
lastWrite = chosen
bc.gcproc = 0
}
}
// Garbage collect anything below our required write retention
for !bc.triegc.Empty() {
root, number := bc.triegc.Pop()
if uint64(-number) > chosen {
bc.triegc.Push(root, number)
break
}
triedb.Dereference(root.(common.Hash))
if prevEntry != nil {
triedb.Dereference(prevEntry.Root)
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions core/blockchain_arbitrum.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,23 @@
package core

import (
"time"

"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)

// WriteBlockAndSetHeadWithTime also counts processTime, which will cause intermittent TrieDirty cache writes
func (bc *BlockChain) WriteBlockAndSetHeadWithTime(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool, processTime time.Duration) (status WriteStatus, err error) {
if !bc.chainmu.TryLock() {
return NonStatTy, errChainStopped
}
defer bc.chainmu.Unlock()
bc.gcproc += processTime
return bc.writeBlockAndSetHead(block, receipts, logs, state, emitHeadEvent)
}

func (bc *BlockChain) ReorgToOldBlock(newHead *types.Block) error {
bc.wg.Add(1)
defer bc.wg.Done()
Expand Down
10 changes: 10 additions & 0 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1767,6 +1767,11 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {
genesis = (&Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db)
engine = ethash.NewFullFaker()
config = &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down Expand Up @@ -1891,6 +1896,11 @@ func TestIssue23496(t *testing.T) {
genesis = (&Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db)
engine = ethash.NewFullFaker()
config = &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down
5 changes: 5 additions & 0 deletions core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1967,6 +1967,11 @@ func testSetHead(t *testing.T, tt *rewindTest, snapshots bool) {
genesis = (&Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db)
engine = ethash.NewFullFaker()
config = &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down
15 changes: 15 additions & 0 deletions core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ func (snaptest *gappedSnapshotTest) test(t *testing.T) {

// Insert a few more blocks without enabling snapshot
var cacheConfig = &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down Expand Up @@ -363,6 +368,11 @@ func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) {
chain.Stop()

config := &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand All @@ -378,6 +388,11 @@ func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) {

// Restart the chain, the wiper should starts working
config = &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down
5 changes: 5 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
EnablePreimageRecording: config.EnablePreimageRecording,
}
cacheConfig = &core.CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: config.TrieCleanCache,
TrieCleanJournal: stack.ResolvePath(config.TrieCleanCacheJournal),
TrieCleanRejournal: config.TrieCleanCacheRejournal,
Expand Down
2 changes: 1 addition & 1 deletion eth/gasprice/gasprice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func newTestBackend(t *testing.T, londonBlock *big.Int, pending bool) *testBacke
// Construct testing chain
diskdb := rawdb.NewMemoryDatabase()
gspec.Commit(diskdb)
chain, err := core.NewBlockChain(diskdb, &core.CacheConfig{TrieCleanNoPrefetch: true}, gspec.Config, engine, vm.Config{}, nil, nil)
chain, err := core.NewBlockChain(diskdb, &core.CacheConfig{TrieCleanNoPrefetch: true, TriesInMemory: 128}, gspec.Config, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to create local chain, %v", err)
}
Expand Down
5 changes: 5 additions & 0 deletions eth/tracers/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func newTestBackend(t *testing.T, n int, gspec *core.Genesis, generator func(i i
// Import the canonical chain
gspec.MustCommit(backend.chaindb)
cacheConfig := &core.CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down
2 changes: 1 addition & 1 deletion miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine
}
genesis := gspec.MustCommit(db)

chain, _ := core.NewBlockChain(db, &core.CacheConfig{TrieDirtyDisabled: true}, gspec.Config, engine, vm.Config{}, nil, nil)
chain, _ := core.NewBlockChain(db, &core.CacheConfig{TrieDirtyDisabled: true, TriesInMemory: 128}, gspec.Config, engine, vm.Config{}, nil, nil)
txpool := core.NewTxPool(testTxPoolConfig, chainConfig, chain)

// Generate a small n-block chain and an uncle block for it
Expand Down
2 changes: 1 addition & 1 deletion tests/block_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (t *BlockTest) Run(snapshotter bool) error {
} else {
engine = ethash.NewShared()
}
cache := &core.CacheConfig{TrieCleanLimit: 0}
cache := &core.CacheConfig{TrieCleanLimit: 0, TriesInMemory: 128}
if snapshotter {
cache.SnapshotLimit = 1
cache.SnapshotWait = true
Expand Down

0 comments on commit ee7893f

Please sign in to comment.