Skip to content

Commit

Permalink
op-node: Cleanup unsafe payload handling (#9661)
Browse files Browse the repository at this point in the history
* op-node: Cleanup unsafe payload handling

* op-e2e: Add regression test for snap sync bug
  • Loading branch information
trianglesphere committed Feb 27, 2024
1 parent 5cfb315 commit c87a469
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 6 deletions.
158 changes: 158 additions & 0 deletions op-e2e/actions/sync_test.go
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -219,6 +220,163 @@ func TestELSync(gt *testing.T) {
)
}

// TestELSyncTransitionstoCL tests that a verifier which starts with EL sync can switch back to a proper CL sync.
// It takes a sequencer & verifier through the following:
// 1. Build 10 unsafe blocks on the sequencer
// 2. Snap sync those blocks to the verifier
// 3. Build & insert 1 unsafe block from the sequencer to the verifier to end snap sync
// 4. Batch submit everything
// 5. Build 10 more unsafe blocks on the sequencer
// 6. Gossip in the highest block to the verifier. **Expect that it does not snap sync**
// 7. Then gossip the rest of the blocks to the verifier. Once this is complete it should pick up all of the unsafe blocks.
// Prior to this PR, the test would fail at this point.
// 8. Create 1 more block & batch submit everything & assert that the verifier picked up those blocks
func TestELSyncTransitionstoCL(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
sd := e2eutils.Setup(t, dp, defaultAlloc)
logger := testlog.Logger(t, log.LevelInfo)

captureLog, captureLogHandler := testlog.CaptureLogger(t, log.LevelInfo)

miner, seqEng, sequencer := setupSequencerTest(t, sd, logger)
batcher := NewL2Batcher(logger, sd.RollupCfg, DefaultBatcherCfg(dp), sequencer.RollupClient(), miner.EthClient(), seqEng.EthClient(), seqEng.EngineClient(t, sd.RollupCfg))
// Enable engine P2P sync
verEng, verifier := setupVerifier(t, sd, captureLog, miner.L1Client(t, sd.RollupCfg), miner.BlobStore(), &sync.Config{SyncMode: sync.ELSync})

seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), logger, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err)

miner.ActEmptyBlock(t)
sequencer.ActL2PipelineFull(t)

// Build 10 L1 blocks on the sequencer
for i := 0; i < 10; i++ {
// Build a L2 block
sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t)
}

// Wait longer to peer. This tests flakes or takes a long time when the op-geth instances are not able to peer.
verEng.AddPeers(seqEng.Enode())

// Insert it on the verifier
seqHead, err := seqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err)
seqStart, err := seqEngCl.PayloadByNumber(t.Ctx(), 1)
require.NoError(t, err)
verifier.ActL2InsertUnsafePayload(seqHead)(t)

require.Eventually(t,
func() bool {
return seqEng.PeerCount() > 0 && verEng.PeerCount() > 0
},
120*time.Second, 1500*time.Millisecond,
"Sequencer & Verifier must peer with each other for snap sync to work",
)

// Expect snap sync to download & execute the entire chain
// Verify this by checking that the verifier has the correct value for block 1
require.Eventually(t,
func() bool {
block, err := verifier.eng.L2BlockRefByNumber(t.Ctx(), 1)
if err != nil {
return false
}
return seqStart.ExecutionPayload.BlockHash == block.Hash
},
60*time.Second, 1500*time.Millisecond,
"verifier did not snap sync",
)
// Despite downloading the blocks, it has not finished finalizing
_, err = verifier.eng.L2BlockRefByLabel(t.Ctx(), "safe")
require.ErrorIs(t, err, ethereum.NotFound)

// Insert a block on the verifier to end snap sync
sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t)
seqHead, err = seqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err)
verifier.ActL2InsertUnsafePayload(seqHead)(t)

// Check that safe + finalized are there
id, err := verifier.eng.L2BlockRefByLabel(t.Ctx(), eth.Safe)
require.Equal(t, uint64(11), id.Number)
require.NoError(t, err)
id, err = verifier.eng.L2BlockRefByLabel(t.Ctx(), eth.Finalized)
require.Equal(t, uint64(11), id.Number)
require.NoError(t, err)

// Batch submit everything
sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t)
batcher.ActSubmitAll(t)
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t)
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)

// Verify that the batch submitted blocks are there now
id, err = sequencer.eng.L2BlockRefByLabel(t.Ctx(), eth.Safe)
require.NoError(t, err)
require.Equal(t, uint64(12), id.Number)
id, err = verifier.eng.L2BlockRefByLabel(t.Ctx(), eth.Safe)
require.NoError(t, err)
require.Equal(t, uint64(12), id.Number)

// Build another 10 L1 blocks on the sequencer
for i := 0; i < 10; i++ {
// Build a L2 block
sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t)
}

// Now pass payloads to the derivation pipeline
// This is a little hacky that we have to manually switch between InsertBlock
// and UnsafeGossipReceive in the tests
seqHead, err = seqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err)
verifier.ActL2UnsafeGossipReceive(seqHead)(t)
verifier.ActL2PipelineFull(t)
// Verify that the derivation pipeline did not request a sync to the new head. This is the core of the test, but a little fragile.
record := captureLogHandler.FindLog(testlog.NewMessageFilter("Forkchoice requested sync to new head"), testlog.NewAttributesFilter("number", "22"))
require.Nil(t, record, "The verifier should not request to sync to block number 22 because it is in CL mode, not EL mode at this point.")

for i := 13; i < 23; i++ {
seqHead, err = seqEngCl.PayloadByNumber(t.Ctx(), uint64(i))
require.NoError(t, err)
verifier.ActL2UnsafeGossipReceive(seqHead)(t)
}
verifier.ActL2PipelineFull(t)

// Verify that the unsafe blocks are there now
// This was failing prior to PR 9661 because op-node would attempt to immediately insert blocks into the EL inside the engine queue. op-geth
// would not be able to fetch the second range of blocks & it would wipe out the unsafe payloads queue because op-node thought that it had a
// higher unsafe block but op-geth did not.
id, err = verifier.eng.L2BlockRefByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err)
require.Equal(t, uint64(22), id.Number)

// Create 1 more block & batch submit everything
sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t)
batcher.ActSubmitAll(t)
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t)
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)

// Verify that the batch submitted blocks are there now
id, err = sequencer.eng.L2BlockRefByLabel(t.Ctx(), eth.Safe)
require.NoError(t, err)
require.Equal(t, uint64(23), id.Number)
id, err = verifier.eng.L2BlockRefByLabel(t.Ctx(), eth.Safe)
require.NoError(t, err)
require.Equal(t, uint64(23), id.Number)
}

func TestInvalidPayloadInSpanBatch(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
Expand Down
4 changes: 2 additions & 2 deletions op-node/rollup/derive/engine_queue.go
Expand Up @@ -180,7 +180,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engin
engine: l2Source,
metrics: metrics,
finalityData: make([]FinalityData, 0, finalityLookback),
unsafePayloads: NewPayloadsQueue(maxUnsafePayloadsMemory, payloadMemSize),
unsafePayloads: NewPayloadsQueue(log, maxUnsafePayloadsMemory, payloadMemSize),
prev: prev,
l1Fetcher: l1Fetcher,
syncCfg: syncCfg,
Expand Down Expand Up @@ -472,7 +472,7 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
}

// Ensure that the unsafe payload builds upon the current unsafe head
if eq.syncCfg.SyncMode != sync.ELSync && first.ParentHash != eq.ec.UnsafeL2Head().Hash {
if first.ParentHash != eq.ec.UnsafeL2Head().Hash {
if uint64(first.BlockNumber) == eq.ec.UnsafeL2Head().Number+1 {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.ec.SafeL2Head().ID(), "unsafe", first.ID(), "payload", first.ID())
eq.unsafePayloads.Pop()
Expand Down
8 changes: 6 additions & 2 deletions op-node/rollup/derive/payloads_queue.go
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-service/eth"
)
Expand Down Expand Up @@ -81,15 +82,17 @@ type PayloadsQueue struct {
MaxSize uint64
blockHashes map[common.Hash]struct{}
SizeFn func(p *eth.ExecutionPayloadEnvelope) uint64
log log.Logger
}

func NewPayloadsQueue(maxSize uint64, sizeFn func(p *eth.ExecutionPayloadEnvelope) uint64) *PayloadsQueue {
func NewPayloadsQueue(log log.Logger, maxSize uint64, sizeFn func(p *eth.ExecutionPayloadEnvelope) uint64) *PayloadsQueue {
return &PayloadsQueue{
pq: nil,
currentSize: 0,
MaxSize: maxSize,
blockHashes: make(map[common.Hash]struct{}),
SizeFn: sizeFn,
log: log,
}
}

Expand Down Expand Up @@ -125,7 +128,8 @@ func (upq *PayloadsQueue) Push(e *eth.ExecutionPayloadEnvelope) error {
})
upq.currentSize += size
for upq.currentSize > upq.MaxSize {
upq.Pop()
env := upq.Pop()
upq.log.Info("Dropping payload from payload queue because the payload queue is too large", "id", env.ExecutionPayload.ID())
}
upq.blockHashes[e.ExecutionPayload.BlockHash] = struct{}{}
return nil
Expand Down
4 changes: 3 additions & 1 deletion op-node/rollup/derive/payloads_queue_test.go
Expand Up @@ -5,9 +5,11 @@ import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)

func TestPayloadsByNumber(t *testing.T) {
Expand Down Expand Up @@ -81,7 +83,7 @@ func envelope(payload *eth.ExecutionPayload) *eth.ExecutionPayloadEnvelope {
}

func TestPayloadsQueue(t *testing.T) {
pq := NewPayloadsQueue(payloadMemFixedCost*3, payloadMemSize)
pq := NewPayloadsQueue(testlog.Logger(t, log.LvlInfo), payloadMemFixedCost*3, payloadMemSize)
require.Equal(t, 0, pq.Len())
require.Nil(t, pq.Peek())
require.Nil(t, pq.Pop())
Expand Down
2 changes: 1 addition & 1 deletion op-node/rollup/driver/state.go
Expand Up @@ -348,7 +348,7 @@ func (s *Driver) eventLoop() {
if err := s.engineController.InsertUnsafePayload(s.driverCtx, envelope, ref); err != nil {
s.log.Warn("Failed to insert unsafe payload for EL sync", "id", envelope.ExecutionPayload.ID(), "err", err)
}
s.logSyncProgress("unsafe payload from sequencer")
s.logSyncProgress("unsafe payload from sequencer while in EL sync")
}
case newL1Head := <-s.l1HeadSig:
s.l1State.HandleNewL1HeadBlock(newL1Head)
Expand Down
28 changes: 28 additions & 0 deletions op-service/testlog/capturing.go
Expand Up @@ -62,6 +62,34 @@ func NewLevelFilter(level slog.Level) LogFilter {
}
}

func NewAttributesFilter(key, value string) LogFilter {
return func(r *slog.Record) bool {
found := false
r.Attrs(func(a slog.Attr) bool {
if a.Key == key && a.Value.String() == value {
found = true
return false
}
return true // try next
})
return found
}
}

func NewAttributesContainsFilter(key, value string) LogFilter {
return func(r *slog.Record) bool {
found := false
r.Attrs(func(a slog.Attr) bool {
if a.Key == key && strings.Contains(a.Value.String(), value) {
found = true
return false
}
return true // try next
})
return found
}
}

func NewMessageFilter(message string) LogFilter {
return func(r *slog.Record) bool {
return r.Message == message
Expand Down
17 changes: 17 additions & 0 deletions op-service/testlog/capturing_test.go
Expand Up @@ -41,3 +41,20 @@ func TestCaptureLogger(t *testing.T) {
require.EqualValues(t, 3, recOp.AttrValue("c"))
// Note: "b" attributes won't be visible on captured record
}

func TestCaptureLoggerAttributesFilter(t *testing.T) {
lgr, logs := testlog.CaptureLogger(t, log.LevelInfo)
msg := "foo bar"
lgr.Info(msg, "a", "test")
lgr.Info(msg, "a", "test 2")
lgr.Info(msg, "a", "random")
msgFilter := testlog.NewMessageFilter(msg)
attrFilter := testlog.NewAttributesFilter("a", "random")

rec := logs.FindLog(msgFilter, attrFilter)
require.Equal(t, msg, rec.Message)
require.EqualValues(t, "random", rec.AttrValue("a"))

recs := logs.FindLogs(msgFilter, attrFilter)
require.Len(t, recs, 1)
}

0 comments on commit c87a469

Please sign in to comment.