Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth: support bubbling up bad blocks from sync to the engine API #25190

Merged
merged 6 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/beacon/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type payloadAttributesMarshaling struct {

//go:generate go run github.com/fjl/gencodec -type ExecutableDataV1 -field-override executableDataMarshaling -out gen_ed.go

// ExecutableDataV1 structure described at https://github.com/ethereum/execution-apis/src/engine/specification.md
// ExecutableDataV1 structure described at https://github.com/ethereum/execution-apis/tree/main/src/engine/specification.md
type ExecutableDataV1 struct {
ParentHash common.Hash `json:"parentHash" gencodec:"required"`
FeeRecipient common.Address `json:"feeRecipient" gencodec:"required"`
Expand Down
139 changes: 127 additions & 12 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,47 @@ func Register(stack *node.Node, backend *eth.Ethereum) error {
return nil
}

const (
// invalidBlockHitEviction is the number of times an invalid block can be
// referenced in forkchoice update or new payload before it is attempted
// to be reprocessed again.
invalidBlockHitEviction = 128

// invalidTipsetsCap is the max number of recent block hashes tracked that
// have lead to some bad ancestor block. It's just an OOM protection.
invalidTipsetsCap = 512
)

type ConsensusAPI struct {
eth *eth.Ethereum
eth *eth.Ethereum

remoteBlocks *headerQueue // Cache of remote payloads received
localBlocks *payloadQueue // Cache of local payloads generated
// Lock for the forkChoiceUpdated method
forkChoiceLock sync.Mutex

// The forkchoice update and new payload method require us to return the
// latest valid hash in an invalid chain. To support that return, we need
// to track historical bad blocks as well as bad tipsets in case a chain
// is constantly built on it.
//
// There are a few important caveats in this mechanism:
// - The bad block tracking is ephemeral, in-memory only. We must never
// persist any bad block information to disk as a bug in Geth could end
// up blocking a valid chain, even if a later Geth update would accept
// it.
// - Bad blocks will get forgotten after a certain threshold of import
// attempts and will be retried. The rationale is that if the network
// really-really-really tries to feed us a block, we should give it a
// new chance, perhaps us being racey instead of the block being legit
// bad (this happened in Geth at a point with import vs. pending race).
// - Tracking all the blocks built on top of the bad one could be a bit
// problematic, so we will only track the head chain segment of a bad
// chain to allow discarding progressing bad chains and side chains,
// without tracking too much bad data.
invalidBlocksHits map[common.Hash]int // Emhemeral cache to track invalid blocks and their hit count
invalidTipsets map[common.Hash]*types.Header // Ephemeral cache to track invalid tipsets and their bad ancestor
invalidLock sync.Mutex // Protects the invalid maps from concurrent access

forkChoiceLock sync.Mutex // Lock for the forkChoiceUpdated method
}

// NewConsensusAPI creates a new consensus api for the given backend.
Expand All @@ -64,11 +99,16 @@ func NewConsensusAPI(eth *eth.Ethereum) *ConsensusAPI {
if eth.BlockChain().Config().TerminalTotalDifficulty == nil {
log.Warn("Engine API started but chain not configured for merge yet")
}
return &ConsensusAPI{
eth: eth,
remoteBlocks: newHeaderQueue(),
localBlocks: newPayloadQueue(),
api := &ConsensusAPI{
eth: eth,
remoteBlocks: newHeaderQueue(),
localBlocks: newPayloadQueue(),
invalidBlocksHits: make(map[common.Hash]int),
invalidTipsets: make(map[common.Hash]*types.Header),
}
eth.Downloader().SetBadBlockCallback(api.setInvalidAncestor)

return api
}

// ForkchoiceUpdatedV1 has several responsibilities:
Expand Down Expand Up @@ -96,6 +136,10 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa
// reason.
block := api.eth.BlockChain().GetBlockByHash(update.HeadBlockHash)
if block == nil {
// If this block was previously invalidated, keep rejecting it here too
if res := api.checkInvalidAncestor(update.HeadBlockHash, update.HeadBlockHash); res != nil {
return beacon.ForkChoiceResponse{PayloadStatus: *res, PayloadID: nil}, nil
}
// If the head hash is unknown (was not given to us in a newPayload request),
// we cannot resolve the header, so not much to do. This could be extended in
// the future to resolve from the `eth` network, but it's an unexpected case
Expand Down Expand Up @@ -266,6 +310,10 @@ func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.Pa
hash := block.Hash()
return beacon.PayloadStatusV1{Status: beacon.VALID, LatestValidHash: &hash}, nil
}
// If this block was rejected previously, keep rejecting it
if res := api.checkInvalidAncestor(block.Hash(), block.Hash()); res != nil {
return *res, nil
}
// If the parent is missing, we - in theory - could trigger a sync, but that
// would also entail a reorg. That is problematic if multiple sibling blocks
// are being fed to us, and even more so, if some semi-distant uncle shortens
Expand Down Expand Up @@ -293,7 +341,7 @@ func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.Pa
}
if block.Time() <= parent.Time() {
log.Warn("Invalid timestamp", "parent", block.Time(), "block", block.Time())
return api.invalid(errors.New("invalid timestamp"), parent), nil
return api.invalid(errors.New("invalid timestamp"), parent.Header()), nil
}
// Another cornercase: if the node is in snap sync mode, but the CL client
// tries to make it import a block. That should be denied as pushing something
Expand All @@ -310,7 +358,13 @@ func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.Pa
log.Trace("Inserting block without sethead", "hash", block.Hash(), "number", block.Number)
if err := api.eth.BlockChain().InsertBlockWithoutSetHead(block); err != nil {
log.Warn("NewPayloadV1: inserting block failed", "error", err)
return api.invalid(err, parent), nil

api.invalidLock.Lock()
api.invalidBlocksHits[block.Hash()] = 1
api.invalidTipsets[block.Hash()] = block.Header()
api.invalidLock.Unlock()

return api.invalid(err, parent.Header()), nil
}
// We've accepted a valid payload from the beacon client. Mark the local
// chain transitions to notify other subsystems (e.g. downloader) of the
Expand Down Expand Up @@ -339,8 +393,13 @@ func computePayloadId(headBlockHash common.Hash, params *beacon.PayloadAttribute
// delayPayloadImport stashes the given block away for import at a later time,
// either via a forkchoice update or a sync extension. This method is meant to
// be called by the newpayload command when the block seems to be ok, but some
// prerequisite prevents it from being processed (e.g. no parent, or nap sync).
// prerequisite prevents it from being processed (e.g. no parent, or snap sync).
func (api *ConsensusAPI) delayPayloadImport(block *types.Block) (beacon.PayloadStatusV1, error) {
// Sanity check that this block's parent is not on a previously invalidated
// chain. If it is, mark the block as invalid too.
if res := api.checkInvalidAncestor(block.ParentHash(), block.Hash()); res != nil {
return *res, nil
}
// Stash the block away for a potential forced forkchoice update to it
// at a later time.
api.remoteBlocks.put(block.Hash(), block.Header())
Expand All @@ -360,14 +419,70 @@ func (api *ConsensusAPI) delayPayloadImport(block *types.Block) (beacon.PayloadS
return beacon.PayloadStatusV1{Status: beacon.ACCEPTED}, nil
}

// setInvalidAncestor is a callback for the downloader to notify us if a bad block
// is encountered during the async sync.
karalabe marked this conversation as resolved.
Show resolved Hide resolved
func (api *ConsensusAPI) setInvalidAncestor(invalid *types.Header, origin *types.Header) {
api.invalidLock.Lock()
defer api.invalidLock.Unlock()

api.invalidTipsets[origin.Hash()] = invalid
api.invalidBlocksHits[invalid.Hash()]++
}

// checkInvalidAncestor checks whether the specified chain end links to a known
// bad ancestor. If yes, it constructs the payload failure response to return.
func (api *ConsensusAPI) checkInvalidAncestor(check common.Hash, head common.Hash) *beacon.PayloadStatusV1 {
api.invalidLock.Lock()
defer api.invalidLock.Unlock()

// If the hash to check is unknown, return valid
invalid, ok := api.invalidTipsets[check]
if !ok {
return nil
}
// If the bad hash was hit too many times, evict it and try to reprocess in
// the hopes that we have a data race that we can exit out of.
badHash := invalid.Hash()

api.invalidBlocksHits[badHash]++
if api.invalidBlocksHits[badHash] >= invalidBlockHitEviction {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's problematic to me.

If we try to re-process a block and eventually it's adopted this time, we only delete "hit marker", but leave "invalidTipSet" unchanged with all descendants still marked as BAD.

Let's imagine the scenario:

  • We have BAD BLOCK X, its descendant X+1 are also marked in the "invalidTipSet"
  • Process X, Block X gets adopted somehow, and invalidBlocksHits[X] is reset to 0
  • Process X+1, invalidTipsets[X+1] is still existent and invalidBlocksHits[X] is 0
  • X+1 will be rejected

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kind of wondering how we expect the CL to operate here. Considering that geth deems Bn to be invalid, would the CL

a) Try to import it over and over Bn, Bn, Bn, Bn...., or
b) Try to import the tips after it: Bn, Bn+1, Bn+2....

This PR lgtm if we expect scenario a), but probably won't work great if we expect scenario b).

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both of them are expected. Different CL's have different strategies regarding retrying invalid blocks/syncing etc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kind of wondering how we expect the CL to operate here. Considering that geth deems Bn to be invalid, would the CL

a) Try to import it over and over Bn, Bn, Bn, Bn...., or b) Try to import the tips after it: Bn, Bn+1, Bn+2....

This PR lgtm if we expect scenario a), but probably won't work great if we expect scenario b).

?

Both scenarios are handled. When a NewPayload is received, we check the block against the set of invalid blocks. If it is present, we reject if (scenario A). If the block is not present, we try to retrieve the parent state (to process on top) and if that's missing, we check the parent's hash for bad block-ness. If it is, we reject the payload (scenario B).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's problematic to me.

If we try to re-process a block and eventually it's adopted this time, we only delete "hit marker", but leave "invalidTipSet" unchanged with all descendants still marked as BAD.

Let's imagine the scenario:

  • We have BAD BLOCK X, its descendant X+1 are also marked in the "invalidTipSet"
  • Process X, Block X gets adopted somehow, and invalidBlocksHits[X] is reset to 0
  • Process X+1, invalidTipsets[X+1] is still existent and invalidBlocksHits[X] is 0
  • X+1 will be rejected

Good catch. We need to clean up all the future blocks referencing the same bad block.

log.Warn("Too many bad block import attempt, trying", "number", invalid.Number, "hash", badHash)
delete(api.invalidBlocksHits, badHash)

for descendant, badHeader := range api.invalidTipsets {
if badHeader.Hash() == badHash {
delete(api.invalidTipsets, descendant)
}
}
return nil
}
// Not too many failures yet, mark the head of the invalid chain as invalid
if check != head {
log.Warn("Marked new chain head as invalid", "hash", head, "badnumber", invalid.Number, "badhash", badHash)
for len(api.invalidTipsets) >= invalidTipsetsCap {
for key := range api.invalidTipsets {
delete(api.invalidTipsets, key)
break
}
}
api.invalidTipsets[head] = invalid
}
failure := "links to previously rejected block"
return &beacon.PayloadStatusV1{
Status: beacon.INVALID,
LatestValidHash: &invalid.ParentHash,
ValidationError: &failure,
}
}

// invalid returns a response "INVALID" with the latest valid hash supplied by latest or to the current head
// if no latestValid block was provided.
func (api *ConsensusAPI) invalid(err error, latestValid *types.Block) beacon.PayloadStatusV1 {
func (api *ConsensusAPI) invalid(err error, latestValid *types.Header) beacon.PayloadStatusV1 {
currentHash := api.eth.BlockChain().CurrentBlock().Hash()
if latestValid != nil {
// Set latest valid hash to 0x0 if parent is PoW block
currentHash = common.Hash{}
if latestValid.Difficulty().BitLen() == 0 {
if latestValid.Difficulty.BitLen() == 0 {
// Otherwise set latest valid hash to parent hash
currentHash = latestValid.Hash()
}
Expand Down
6 changes: 3 additions & 3 deletions eth/catalyst/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,16 +773,16 @@ func TestTrickRemoteBlockCache(t *testing.T) {
if err != nil {
panic(err)
}
if status.Status == beacon.INVALID {
panic("success")
if status.Status == beacon.VALID {
t.Error("invalid status: VALID on an invalid chain")
}
// Now reorg to the head of the invalid chain
resp, err := apiB.ForkchoiceUpdatedV1(beacon.ForkchoiceStateV1{HeadBlockHash: payload.BlockHash, SafeBlockHash: payload.BlockHash, FinalizedBlockHash: payload.ParentHash}, nil)
if err != nil {
t.Fatal(err)
}
if resp.PayloadStatus.Status == beacon.VALID {
t.Errorf("invalid status: expected INVALID got: %v", resp.PayloadStatus.Status)
t.Error("invalid status: VALID on an invalid chain")
}
time.Sleep(100 * time.Millisecond)
}
Expand Down
7 changes: 7 additions & 0 deletions eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ func (b *beaconBackfiller) setMode(mode SyncMode) {
b.resume()
}

// SetBadBlockCallback sets the callback to run when a bad block is hit by the
// block processor. This method is not thread safe and should be set only once
// on startup before system events are fired.
func (d *Downloader) SetBadBlockCallback(onBadBlock badBlockFn) {
d.badBlock = onBadBlock
}

// BeaconSync is the post-merge version of the chain synchronization, where the
// chain is not downloaded from genesis onward, rather from trusted head announces
// backwards.
Expand Down
17 changes: 16 additions & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ var (
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)

// badBlockFn is a callback for the async beacon sync to notify the caller that
// the origin header requested to sync to, produced a chain with a bad block.
type badBlockFn func(invalid *types.Header, origin *types.Header)

// headerTask is a set of downloaded headers to queue along with their precomputed
// hashes to avoid constant rehashing.
type headerTask struct {
Expand Down Expand Up @@ -113,6 +117,7 @@ type Downloader struct {

// Callbacks
dropPeer peerDropFn // Drops a peer for misbehaving
badBlock badBlockFn // Reports a block as rejected by the chain

// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
Expand Down Expand Up @@ -1528,7 +1533,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
return errCancelContentProcessing
default:
}
// Retrieve the a batch of results to import
// Retrieve a batch of results to import
first, last := results[0].Header, results[len(results)-1].Header
log.Debug("Inserting downloaded chain", "items", len(results),
"firstnum", first.Number, "firsthash", first.Hash(),
Expand All @@ -1544,6 +1549,16 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
if index, err := d.blockchain.InsertChain(blocks); err != nil {
if index < len(results) {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)

// In post-merge, notify the engine API of encountered bad chains
if d.badBlock != nil {
head, _, err := d.skeleton.Bounds()
if err != nil {
log.Error("Failed to retrieve beacon bounds for bad block reporting", "err", err)
} else {
d.badBlock(blocks[index].Header(), head)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So blocks[index] is the bad block and head is the head that we currently trying to sync to, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This index can be inaccurate. e.g. here https://github.com/ethereum/go-ethereum/blob/master/core/blockchain.go#L1511 the returned index is the index of PrunedBlock, but not the real bad block position.

But this can be fixed in a following PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recovering the ancestor shouldn't fail though, it if was already once imported?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

theoretically, yes. It shouldn't fail.

}
}
} else {
// The InsertChain method in blockchain.go will sometimes return an out-of-bounds index,
// when it needs to preprocess blocks to import a sidechain.
Expand Down