Skip to content

Commit

Permalink
op-node: implement span channel out block count limit
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianst committed Aug 9, 2024
1 parent ec45f66 commit b7d80e4
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 16 deletions.
108 changes: 100 additions & 8 deletions op-node/rollup/derive/channel_out_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,17 @@ import (
"github.com/stretchr/testify/require"

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
)

var rollupCfg rollup.Config
var rollupCfg = rollup.Config{
Genesis: rollup.Genesis{
L2Time: uint64(1723618465),
},
BlockTime: 2,
L2ChainID: big.NewInt(420),
L1ChainID: big.NewInt(161),
}

// basic implementation of the Compressor interface that does no compression
type nonCompressor struct {
Expand Down Expand Up @@ -203,7 +211,7 @@ func TestForceCloseTxData(t *testing.T) {
for i, test := range tests {
out, err := ForceCloseTxData(test.frames)
if test.errors {
require.NotNil(t, err, "Should error on tc %v", i)
require.Error(t, err, "Should error on tc %v", i)
require.Nil(t, out, "Should return no value in tc %v", i)
} else {
require.NoError(t, err, "Should not error on tc %v", i)
Expand All @@ -218,18 +226,21 @@ func TestBlockToBatchValidity(t *testing.T) {
require.ErrorContains(t, err, "has no transactions")
}

func SpanChannelAndBatches(t *testing.T, target uint64, len int, algo CompressionAlgo) (*SpanChannelOut, []*SingularBatch) {
func SpanChannelAndBatches(t *testing.T, targetOutputSize uint64, numBatches int, algo CompressionAlgo, opts ...SpanChannelOutOption) (*SpanChannelOut, []*SingularBatch) {
// target is larger than one batch, but smaller than two batches
rng := rand.New(rand.NewSource(0x543331))
chainID := big.NewInt(rng.Int63n(1000))
chainID := rollupCfg.L2ChainID
txCount := 1
cout, err := NewSpanChannelOut(0, chainID, target, algo, rollup.NewChainSpec(&rollupCfg))
genesisTime := rollupCfg.Genesis.L2Time
cout, err := NewSpanChannelOut(genesisTime, chainID, targetOutputSize, algo, rollup.NewChainSpec(&rollupCfg), opts...)
require.NoError(t, err)
batches := make([]*SingularBatch, len)
batches := make([]*SingularBatch, 0, numBatches)
// adding the first batch should not cause an error
for i := 0; i < len; i++ {
for i := 0; i < numBatches; i++ {
singularBatch := RandomSingularBatch(rng, txCount, chainID)
batches[i] = singularBatch
// use default 2 sec block time
singularBatch.Timestamp = genesisTime + 420_000 + rollupCfg.BlockTime*uint64(i)
batches = append(batches, singularBatch)
}

return cout, batches
Expand Down Expand Up @@ -324,3 +335,84 @@ func SpanChannelOutClose(t *testing.T, algo CompressionAlgo) {
require.Greater(t, cout.compressor.Len(), 0)
require.Equal(t, rlpLen, cout.activeRLP().Len())
}

func TestSpanChannelOut_MaxBlocksPerSpanBatch(t *testing.T) {
// TODO: maybe two test cases, one filling up the last span batch perfectly, then one requiring init of the next
const (
numBatches = 16
maxBlocks = 4
numSpanBatches = numBatches / maxBlocks
outputSize = 11_000
)
l1Origin := eth.L1BlockRef{Number: 42_000, Hash: common.Hash{0xde, 0xad, 0x42}}
l2SafeHead := eth.L2BlockRef{}
cout, bs := SpanChannelAndBatches(t, outputSize, numBatches, Brotli, SCOWithMaxBlocksPerSpanBatch(maxBlocks))
for i, b := range bs {
b.EpochNum = rollup.Epoch(l1Origin.Number)
b.EpochHash = l1Origin.Hash
err := cout.AddSingularBatch(b, uint64(i))
if i != numBatches-1 {
require.NoErrorf(t, err, "iteration %d", i)
} else {
// adding last batch should not succeed
require.ErrorIs(t, err, ErrCompressorFull)
}

}
require.ErrorIs(t, cout.FullErr(), ErrCompressorFull)
require.Equal(t, maxBlocks, cout.spanBatch.GetBlockCount(),
"last block should still have been added to the span batch")
require.NoError(t, cout.Close())

// write cannel into a single frame
var frameBuf bytes.Buffer
fn, err := cout.OutputFrame(&frameBuf, outputSize+FrameV0OverHeadSize)
require.Zero(t, fn)
require.ErrorIs(t, err, io.EOF)

// now roundtrip to decode the batches
var frame Frame
require.NoError(t, frame.UnmarshalBinary(&frameBuf))
require.True(t, frame.IsLast)
spec := rollup.NewChainSpec(&rollupCfg)
ch := NewChannel(frame.ID, l1Origin)
require.False(t, ch.IsReady())
require.NoError(t, ch.AddFrame(frame, l1Origin))
require.True(t, ch.IsReady())
br, err := BatchReader(ch.Reader(), spec.MaxRLPBytesPerChannel(0), true)
require.NoError(t, err)

sbs := make([]*SingularBatch, 0, numBatches-1)
for i := 0; i < numSpanBatches; i++ {
t.Logf("iteration %d", i)
expBlocks := maxBlocks
if i == numSpanBatches-1 {
// last span batch contains one less as adding the last went over the limit
expBlocks--
}

bd, err := br()
require.NoError(t, err)
require.EqualValues(t, SpanBatchType, bd.GetBatchType())
sb, err := DeriveSpanBatch(bd, rollupCfg.BlockTime, rollupCfg.Genesis.L2Time, cout.spanBatch.ChainID)
require.NoError(t, err)
require.Equal(t, expBlocks, sb.GetBlockCount())
sbs0, err := sb.GetSingularBatches([]eth.L1BlockRef{l1Origin}, l2SafeHead)
require.NoError(t, err)
// last span batch contains one less
require.Len(t, sbs0, expBlocks)
sbs = append(sbs, sbs0...)
}

// batch reader should be exhausted
_, err = br()
require.ErrorIs(t, err, io.EOF)

for i, batch := range sbs {
batch0 := bs[i]
// clear the expected parent hash, as GetSingularBatches doesn't set these yet
// we still compare timestamps and txs, which is enough
batch0.ParentHash = (common.Hash{})
require.Equalf(t, batch0, batch, "iteration %d", i)
}
}
82 changes: 74 additions & 8 deletions op-node/rollup/derive/span_channel_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package derive

import (
"bytes"

"crypto/rand"
"fmt"
"io"
Expand Down Expand Up @@ -37,6 +36,15 @@ type SpanChannelOut struct {
// spanBatch is the batch being built, which immutably holds genesis timestamp and chain ID, but otherwise can be reset
spanBatch *SpanBatch

// maxBlocksPerSpanBatch is an optional limit on the number of blocks per span batch.
// If non-zero, a new span batch will be started after the current span batch has
// reached this maximum.
maxBlocksPerSpanBatch int

// sealedRLPBytes stores the sealed number of input RLP bytes. This is used when maxBlocksPerSpanBatch is non-zero
// to seal full span batches (that have reached the max block count) in the rlp slices.
sealedRLPBytes int

chainSpec *rollup.ChainSpec
}

Expand All @@ -49,7 +57,15 @@ func (co *SpanChannelOut) setRandomID() error {
return err
}

func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSize uint64, compressionAlgo CompressionAlgo, chainSpec *rollup.ChainSpec) (*SpanChannelOut, error) {
type SpanChannelOutOption func(co *SpanChannelOut)

func SCOWithMaxBlocksPerSpanBatch(maxBlock int) SpanChannelOutOption {
return func(co *SpanChannelOut) {
co.maxBlocksPerSpanBatch = maxBlock
}
}

func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSize uint64, compressionAlgo CompressionAlgo, chainSpec *rollup.ChainSpec, opts ...SpanChannelOutOption) (*SpanChannelOut, error) {
c := &SpanChannelOut{
id: ChannelID{},
frame: 0,
Expand All @@ -67,22 +83,32 @@ func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSi
return nil, err
}

for _, opt := range opts {
opt(c)
}

return c, nil
}

func (co *SpanChannelOut) Reset() error {
co.closed = false
co.full = nil
co.frame = 0
co.sealedRLPBytes = 0
co.rlp[0].Reset()
co.rlp[1].Reset()
co.lastCompressedRLPSize = 0
co.compressor.Reset()
// TODO: resetSpanBatch
co.spanBatch = NewSpanBatch(co.spanBatch.GenesisTimestamp, co.spanBatch.ChainID)
// setting the new randomID is the only part of the reset that can fail
return co.setRandomID()
}

func (co *SpanChannelOut) resetSpanBatch() {
co.spanBatch = NewSpanBatch(co.spanBatch.GenesisTimestamp, co.spanBatch.ChainID)
}

// activeRLP returns the active RLP buffer using the current rlpIndex
func (co *SpanChannelOut) activeRLP() *bytes.Buffer {
return co.rlp[co.rlpIndex]
Expand Down Expand Up @@ -127,6 +153,7 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64)
return err
}

co.ensureOpenSpanBatch()
// update the SpanBatch with the SingularBatch
if err := co.spanBatch.AppendSingularBatch(batch, seqNum); err != nil {
return fmt.Errorf("failed to append SingularBatch to SpanBatch: %w", err)
Expand All @@ -137,10 +164,11 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64)
return fmt.Errorf("failed to convert SpanBatch into RawSpanBatch: %w", err)
}

// switch to the other buffer and reset it for new use
// (the RLP buffer which is being made inactive holds the RLP encoded span batch just before the new batch was added)
// switch to the other buffer and truncate it for new use
// (the RLP buffer which is being made inactive holds the RLP encoded span batch(es)
// just before the new batch was added)
co.swapRLP()
co.activeRLP().Reset()
co.resetActiveRLPToSealed()
if err = rlp.Encode(co.activeRLP(), NewBatchData(rawSpanBatch)); err != nil {
return fmt.Errorf("failed to encode RawSpanBatch into bytes: %w", err)
}
Expand Down Expand Up @@ -171,15 +199,15 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64)
// if the channel is now full, either return the compressed data, or the compressed previous data
if err := co.FullErr(); err != nil {
// if there is only one batch in the channel, it *must* be returned
if len(co.spanBatch.Batches) == 1 {
if co.spanBatch.GetBlockCount() == 1 {
return nil
}

// if there is more than one batch in the channel, we revert the last batch
// by switching the RLP buffer and doing a fresh compression
co.swapRLP()
if err := co.compress(); err != nil {
return err
if cerr := co.compress(); cerr != nil {
return cerr
}
// return the full error
return err
Expand All @@ -188,10 +216,48 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64)
return nil
}

func (co *SpanChannelOut) ensureOpenSpanBatch() {
if co.maxBlocksPerSpanBatch == 0 || co.spanBatch.GetBlockCount() < co.maxBlocksPerSpanBatch {
return
}
// we assume that the full span batch has been written to the last active rlp buffer
active, inactive := co.activeRLP(), co.inactiveRLP()
if inactive.Len() > active.Len() {
panic("inactive rlp unexpectedly larger")
}
co.sealedRLPBytes = active.Len()
// Copy active to inactive rlp buffer so both have the same sealed state
// and resetting works as intended.
inactive.Reset()
// err is guaranteed to always be nil
_, _ = inactive.Write(active.Bytes())
co.resetSpanBatch()
}

func (co *SpanChannelOut) resetActiveRLPToSealed() {
active := co.activeRLP()
// if active.Len() >= co.sealedRLPBytes {
active.Truncate(co.sealedRLPBytes)
// } else {
// // just when ensureOpenSpanBatch switched to a new span batch, one rlp buffer
// // will still contain the encoded span batch minus the last block, so we
// // "truncate" it to the other rlp buffer
// active.Reset()
// inactiveBytes := co.inactiveRLP().Bytes()
// if len(inactiveBytes) != co.sealedRLPBytes {
// panic("unexpected other rlp buffer length")
// }
// active.Write(inactiveBytes)
// }
}

// compress compresses the active RLP buffer and checks if the compressed data is over the target size.
// it resets all the compression buffers because Span Batches aren't meant to be compressed incrementally.
func (co *SpanChannelOut) compress() error {
co.compressor.Reset()
// we write a slice of the active RLP to the compressor, so the active RLP's
// buffer is not advanced as a ReadWriter, making it possible to later use
// Truncate.
if _, err := co.compressor.Write(co.activeRLP().Bytes()); err != nil {
return err
}
Expand Down

0 comments on commit b7d80e4

Please sign in to comment.