diff --git a/cmd/geth/main.go b/cmd/geth/main.go index fb89b7703f97..3b00ca6d941d 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -169,6 +169,7 @@ var ( utils.L1EndpointFlag, utils.L1ConfirmationsFlag, utils.L1DeploymentBlockFlag, + utils.L1DisableMessageQueueV2Flag, utils.CircuitCapacityCheckEnabledFlag, utils.CircuitCapacityCheckWorkersFlag, utils.RollupVerifyEnabledFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 5add55be7a47..57a1fd22e77d 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -854,6 +854,10 @@ var ( Name: "l1.sync.startblock", Usage: "L1 block height to start syncing from. Should be set to the L1 message queue deployment block number.", } + L1DisableMessageQueueV2Flag = &cli.BoolFlag{ + Name: "l1.disablemqv2", + Usage: "Disable L1 message queue v2", + } // Circuit capacity check settings CircuitCapacityCheckEnabledFlag = cli.BoolFlag{ @@ -1408,6 +1412,9 @@ func setL1(ctx *cli.Context, cfg *node.Config) { if ctx.GlobalIsSet(L1DeploymentBlockFlag.Name) { cfg.L1DeploymentBlock = ctx.GlobalUint64(L1DeploymentBlockFlag.Name) } + if ctx.GlobalIsSet(L1DisableMessageQueueV2Flag.Name) { + cfg.L1DisableMessageQueueV2 = ctx.GlobalBool(L1DisableMessageQueueV2Flag.Name) + } } func setSmartCard(ctx *cli.Context, cfg *node.Config) { diff --git a/core/block_validator.go b/core/block_validator.go index 9eb3accacdb4..fab1c4060b73 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -139,8 +139,52 @@ func (v *BlockValidator) ValidateL1Messages(block *types.Block) error { queueIndex := *nextQueueIndex L1SectionOver := false - it := rawdb.IterateL1MessagesFrom(v.bc.db, queueIndex) + // From EuclidV2 onwards there can't be any skipped L1 messages, and we use a different L1MessageQueueV2. + if v.config.IsEuclidV2(block.Time()) { + it := rawdb.IterateL1MessagesV2From(v.bc.db, queueIndex) + for _, tx := range block.Transactions() { + if !tx.IsL1MessageTx() { + L1SectionOver = true + continue // we do not verify L2 transactions here + } + + // check that L1 messages are before L2 transactions + if L1SectionOver { + return consensus.ErrInvalidL1MessageOrder + } + + // queue index must be equal to the expected value + txQueueIndex := tx.AsL1MessageTx().QueueIndex + if txQueueIndex != queueIndex { + return consensus.ErrInvalidL1MessageOrder + } + + if exists := it.Next(); !exists { + if err := it.Error(); err != nil { + log.Error("Unexpected DB error in ValidateL1Messages", "err", err, "queueIndex", txQueueIndex) + } + // the message in this block is not available in our local db. + // we'll reprocess this block at a later time. + return consensus.ErrMissingL1MessageData + } + + // check that the L1 message in the block is the same that we collected from L1 + msg := it.L1Message() + expectedHash := types.NewTx(&msg).Hash() + + if tx.Hash() != expectedHash { + return consensus.ErrUnknownL1Message + } + + // we expect L1 messages to be in order and contiguous + queueIndex++ + } + + return nil + } + + it := rawdb.IterateL1MessagesV1From(v.bc.db, queueIndex) for _, tx := range block.Transactions() { if !tx.IsL1MessageTx() { L1SectionOver = true diff --git a/core/blockchain.go b/core/blockchain.go index 57d82bc118a4..73e2a57fa21a 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1853,6 +1853,18 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types l.BlockHash = blockHash } + // Make sure the block body is valid e.g. ordering of L1 messages is correct and continuous. + if err = bc.validator.ValidateBody(fullBlock); err != nil { + bc.reportBlock(fullBlock, receipts, err) + return NonStatTy, fmt.Errorf("error validating block body %d: %w", fullBlock.Number().Uint64(), err) + } + + // Double check: even though we just built the block, make sure it is valid. + if err = bc.validator.ValidateState(fullBlock, statedb, receipts, gasUsed); err != nil { + bc.reportBlock(fullBlock, receipts, err) + return NonStatTy, fmt.Errorf("error validating block %d: %w", fullBlock.Number().Uint64(), err) + } + return bc.writeBlockWithState(fullBlock, receipts, logs, statedb, false) } diff --git a/core/rawdb/accessors_l1_message.go b/core/rawdb/accessors_l1_message.go index 4ae27c7b8ff0..a447160f12c1 100644 --- a/core/rawdb/accessors_l1_message.go +++ b/core/rawdb/accessors_l1_message.go @@ -141,9 +141,9 @@ type L1MessageIterator struct { maxQueueIndex uint64 } -// IterateL1MessagesFrom creates an L1MessageIterator that iterates over +// iterateL1MessagesFrom creates an L1MessageIterator that iterates over // all L1 message in the database starting at the provided enqueue index. -func IterateL1MessagesFrom(db ethdb.Database, fromQueueIndex uint64) L1MessageIterator { +func iterateL1MessagesFrom(db ethdb.Database, fromQueueIndex uint64) L1MessageIterator { start := encodeBigEndian(fromQueueIndex) it := db.NewIterator(l1MessagePrefix, start) keyLength := len(l1MessagePrefix) + 8 @@ -208,10 +208,72 @@ func (it *L1MessageIterator) Error() error { return it.inner.Error() } -// ReadL1MessagesFrom retrieves up to `maxCount` L1 messages starting at `startIndex`. -func ReadL1MessagesFrom(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx { +// L1MessageV1Iterator is a wrapper around L1MessageIterator that allows us to iterate over L1 messages V1. +type L1MessageV1Iterator struct { + db ethdb.Database + v2StartIndex *uint64 + L1MessageIterator +} + +// IterateL1MessagesV1From yields a L1MessageV1Iterator with following behavior: +// - If fromQueueIndex >= L1MessageV2StartIndex: yield 0 messages. +// - Otherwise, simply yield all messages (guaranteed to be V1) starting from `fromQueueIndex` until `L1MessageV2StartIndex`. +func IterateL1MessagesV1From(db ethdb.Database, fromQueueIndex uint64) L1MessageV1Iterator { + return L1MessageV1Iterator{ + db: db, + v2StartIndex: ReadL1MessageV2StartIndex(db), + L1MessageIterator: iterateL1MessagesFrom(db, fromQueueIndex), + } +} + +func (it *L1MessageV1Iterator) Next() bool { + for it.L1MessageIterator.Next() { + // L1MessageV2StartIndex is the first queue index of L1 messages that are from L1MessageQueueV2. + // Therefore, we stop reading L1 messages V1 when we reach this index. + // We need to check in every iteration if not yet set as the start index can be set in the meantime when we are reading L1 messages. + if it.v2StartIndex == nil { + it.v2StartIndex = ReadL1MessageV2StartIndex(it.db) + } + + if it.v2StartIndex != nil && it.QueueIndex() >= *it.v2StartIndex { + return false + } + return true + } + return false +} + +// L1MessageV2Iterator is a wrapper around L1MessageIterator that allows us to iterate over L1 messages V2. +type L1MessageV2Iterator struct { + v2StartIndex *uint64 + L1MessageIterator +} + +// IterateL1MessagesV2From yields a L1MessageV2Iterator with following behavior: +// - If fromQueueIndex < v2StartIndex: yield 0 messages. +// - Otherwise, simply yield all messages (guaranteed to be v2) starting from `fromQueueIndex`. +func IterateL1MessagesV2From(db ethdb.Database, fromQueueIndex uint64) L1MessageV2Iterator { + v2StartIndex := ReadL1MessageV2StartIndex(db) + + return L1MessageV2Iterator{ + v2StartIndex: v2StartIndex, + L1MessageIterator: iterateL1MessagesFrom(db, fromQueueIndex), + } +} + +func (it *L1MessageV2Iterator) Next() bool { + if it.v2StartIndex == nil { + return false + } + + return it.L1MessageIterator.Next() && it.QueueIndex() >= *it.v2StartIndex +} + +// ReadL1MessagesV1From retrieves up to `maxCount` L1 messages V1 starting at `startIndex`. +// If startIndex is >= L1MessageV2StartIndex, this function returns an empty slice. +func ReadL1MessagesV1From(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx { msgs := make([]types.L1MessageTx, 0, maxCount) - it := IterateL1MessagesFrom(db, startIndex) + it := IterateL1MessagesV1From(db, startIndex) defer it.Release() index := startIndex @@ -223,7 +285,50 @@ func ReadL1MessagesFrom(db ethdb.Database, startIndex, maxCount uint64) []types. // sanity check if msg.QueueIndex != index { log.Crit( - "Unexpected QueueIndex in ReadL1MessagesFrom", + "Unexpected QueueIndex in ReadL1MessagesV1From", + "expected", index, + "got", msg.QueueIndex, + "startIndex", startIndex, + "maxCount", maxCount, + ) + } + + msgs = append(msgs, msg) + index += 1 + count -= 1 + + iteratorL1MessageSizeGauge.Update(int64(unsafe.Sizeof(msg) + uintptr(cap(msg.Data)))) + + if msg.QueueIndex == it.maxQueueIndex { + break + } + } + + if err := it.Error(); err != nil { + log.Crit("Failed to read L1 messages", "err", err) + } + + return msgs +} + +// ReadL1MessagesV2From retrieves up to `maxCount` L1 messages V2 starting at `startIndex`. +// If startIndex is smaller than L1MessageV2StartIndex, this function returns an empty slice. +func ReadL1MessagesV2From(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx { + msgs := make([]types.L1MessageTx, 0, maxCount) + + it := IterateL1MessagesV2From(db, startIndex) + defer it.Release() + + index := startIndex + count := maxCount + + for count > 0 && it.Next() { + msg := it.L1Message() + + // sanity check + if msg.QueueIndex != index { + log.Crit( + "Unexpected QueueIndex in ReadL1MessagesV2From", "expected", index, "got", msg.QueueIndex, "startIndex", startIndex, @@ -275,3 +380,65 @@ func ReadFirstQueueIndexNotInL2Block(db ethdb.Reader, l2BlockHash common.Hash) * queueIndex := binary.BigEndian.Uint64(data) return &queueIndex } + +// WriteL1MessageV2StartIndex writes the start index of L1 messages that are from L1MessageQueueV2. +func WriteL1MessageV2StartIndex(db ethdb.KeyValueWriter, queueIndex uint64) { + value := big.NewInt(0).SetUint64(queueIndex).Bytes() + + if err := db.Put(l1MessageV2StartIndexKey, value); err != nil { + log.Crit("Failed to update L1MessageV2 start index", "err", err) + } +} + +// ReadL1MessageV2StartIndex retrieves the start index of L1 messages that are from L1MessageQueueV2. +func ReadL1MessageV2StartIndex(db ethdb.Reader) *uint64 { + data, err := db.Get(l1MessageV2StartIndexKey) + if err != nil && isNotFoundErr(err) { + return nil + } + if err != nil { + log.Crit("Failed to read L1MessageV2 start index from database", "err", err) + } + if len(data) == 0 { + return nil + } + + number := new(big.Int).SetBytes(data) + if !number.IsUint64() { + log.Crit("Unexpected number for L1MessageV2 start index", "number", number) + } + + res := number.Uint64() + return &res +} + +// WriteL1MessageV2FirstL1BlockNumber writes the first synced L1 block number for L1MessageV2. +func WriteL1MessageV2FirstL1BlockNumber(db ethdb.KeyValueWriter, l1BlockNumber uint64) { + value := big.NewInt(0).SetUint64(l1BlockNumber).Bytes() + + if err := db.Put(l1MessageV2FirstL1BlockNumberKey, value); err != nil { + log.Crit("Failed to update L1MessageV2 start index", "err", err) + } +} + +// ReadL1MessageV2FirstL1BlockNumber retrieves the first synced L1 block number for L1MessageV2. +func ReadL1MessageV2FirstL1BlockNumber(db ethdb.Reader) *uint64 { + data, err := db.Get(l1MessageV2FirstL1BlockNumberKey) + if err != nil && isNotFoundErr(err) { + return nil + } + if err != nil { + log.Crit("Failed to read L1MessageV2 first L1 block number from database", "err", err) + } + if len(data) == 0 { + return nil + } + + number := new(big.Int).SetBytes(data) + if !number.IsUint64() { + log.Crit("Unexpected number for L1MessageV2 first L1 block number", "number", number) + } + + res := number.Uint64() + return &res +} diff --git a/core/rawdb/accessors_l1_message_test.go b/core/rawdb/accessors_l1_message_test.go index 314d6a604a6e..6a596410b2bc 100644 --- a/core/rawdb/accessors_l1_message_test.go +++ b/core/rawdb/accessors_l1_message_test.go @@ -4,6 +4,8 @@ import ( "math/big" "testing" + "github.com/stretchr/testify/require" + "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/core/types" ) @@ -72,7 +74,7 @@ func TestIterateL1Message(t *testing.T) { t.Fatal("max index mismatch", "expected", 1000, "got", max) } - it := IterateL1MessagesFrom(db, 103) + it := iterateL1MessagesFrom(db, 103) defer it.Release() for ii := 2; ii < len(msgs); ii++ { @@ -104,7 +106,7 @@ func TestReadL1MessageTxRange(t *testing.T) { db := NewMemoryDatabase() WriteL1Messages(db, msgs) - got := ReadL1MessagesFrom(db, 101, 3) + got := ReadL1MessagesV1From(db, 101, 3) if len(got) != 3 { t.Fatal("Invalid length", "expected", 3, "got", len(got)) @@ -151,9 +153,83 @@ func TestIterationStopsAtMaxQueueIndex(t *testing.T) { WriteHighestSyncedQueueIndex(db, 102) // iteration should terminate at 102 and not read 103 - got := ReadL1MessagesFrom(db, 100, 10) + got := ReadL1MessagesV1From(db, 100, 10) if len(got) != 3 { t.Fatal("Invalid length", "expected", 3, "got", len(got)) } } + +func TestIterateL1MessagesV1From(t *testing.T) { + msgs := []types.L1MessageTx{ + newL1MessageTx(100), + newL1MessageTx(101), + newL1MessageTx(102), + newL1MessageTx(103), + newL1MessageTx(104), + } + + db := NewMemoryDatabase() + WriteL1Messages(db, msgs) + WriteL1MessageV2StartIndex(db, 103) + + it := IterateL1MessagesV1From(db, 100) + defer it.Release() + + for _, msg := range msgs { + if msg.QueueIndex < 103 { + require.True(t, it.Next(), "Iterator terminated early") + require.Equal(t, msg.QueueIndex, it.L1Message().QueueIndex, "Invalid result") + } else { + require.Falsef(t, it.Next(), "Iterator did not terminate, queueIndex %d", msg.QueueIndex) + } + } + + finished := !it.Next() + if !finished { + t.Fatal("Iterator did not terminate") + } +} + +func TestIterateL1MessagesV2From(t *testing.T) { + msgs := []types.L1MessageTx{ + newL1MessageTx(100), + newL1MessageTx(101), + newL1MessageTx(102), + newL1MessageTx(103), + newL1MessageTx(104), + } + + db := NewMemoryDatabase() + WriteL1Messages(db, msgs) + + // no L1MessageV2 in the database + { + it := IterateL1MessagesV2From(db, 100) + require.Falsef(t, it.Next(), "Iterator did not terminate") + it.Release() + } + + WriteL1MessageV2StartIndex(db, 103) + // L1MessageV2 in the database starting from 103 -> no iteration + { + it := IterateL1MessagesV2From(db, 100) + require.Falsef(t, it.Next(), "Iterator did not terminate") + it.Release() + } + + it := IterateL1MessagesV2From(db, 103) + defer it.Release() + + for _, msg := range msgs { + if msg.QueueIndex >= 103 { + require.True(t, it.Next(), "Iterator terminated early") + require.Equal(t, msg.QueueIndex, it.L1Message().QueueIndex, "Invalid result") + } + } + + finished := !it.Next() + if !finished { + t.Fatal("Iterator did not terminate") + } +} diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 47b29c77d840..16090569d7d6 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -109,6 +109,8 @@ var ( l1MessagePrefix = []byte("L1") // l1MessagePrefix + queueIndex (uint64 big endian) -> L1MessageTx firstQueueIndexNotInL2BlockPrefix = []byte("q") // firstQueueIndexNotInL2BlockPrefix + L2 block hash -> enqueue index highestSyncedQueueIndexKey = []byte("HighestSyncedQueueIndex") + l1MessageV2StartIndexKey = []byte("MessageQueueV2StartIndex") + l1MessageV2FirstL1BlockNumberKey = []byte("MessageQueueV2FirstL1BlockNumber") // Scroll rollup event store rollupEventSyncedL1BlockNumberKey = []byte("R-LastRollupEventSyncedL1BlockNumber") diff --git a/go.mod b/go.mod index 65740d79eedd..f739a26565b9 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/Azure/azure-storage-blob-go v0.7.0 github.com/VictoriaMetrics/fastcache v1.12.2 + github.com/agiledragon/gomonkey/v2 v2.12.0 github.com/aws/aws-sdk-go-v2 v1.2.0 github.com/aws/aws-sdk-go-v2/config v1.1.1 github.com/aws/aws-sdk-go-v2/credentials v1.1.1 diff --git a/miner/scroll_worker.go b/miner/scroll_worker.go index a6aa7c21c207..db2b355738a9 100644 --- a/miner/scroll_worker.go +++ b/miner/scroll_worker.go @@ -436,9 +436,34 @@ func (w *worker) updateSnapshot() { w.snapshotState = w.current.state.Copy() } +// collectPendingL1Messages reads pending L1 messages from the database. +// It returns a list of L1 messages that can be included in the block. Depending on the current +// block time, it reads L1 messages from either L1MessageQueueV1 or L1MessageQueueV2. +// It also makes sure that all L1 messages V1 are consumed before we activate EuclidV2 fork by backdating the block's time +// to the parent block's timestamp. func (w *worker) collectPendingL1Messages(startIndex uint64) []types.L1MessageTx { maxCount := w.chainConfig.Scroll.L1Config.NumL1MessagesPerBlock - return rawdb.ReadL1MessagesFrom(w.eth.ChainDb(), startIndex, maxCount) + + // If we are on EuclidV2, we need to read L1 messages from L1MessageQueueV2. + if w.chainConfig.IsEuclidV2(w.current.header.Time) { + parent := w.chain.CurrentHeader() + + // w.current would be the first block in the EuclidV2 fork + if !w.chainConfig.IsEuclidV2(parent.Time) { + // We need to make sure that all the L1 messages V1 are consumed before we activate EuclidV2 as with EuclidV2 + // only L1 messages V2 are allowed. + l1MessagesV1 := rawdb.ReadL1MessagesV1From(w.eth.ChainDb(), startIndex, maxCount) + if len(l1MessagesV1) > 0 { + // backdate the block to the parent block's timestamp -> not yet EuclidV2 + w.current.header.Time = parent.Time + return l1MessagesV1 + } + } + + return rawdb.ReadL1MessagesV2From(w.eth.ChainDb(), startIndex, maxCount) + } + + return rawdb.ReadL1MessagesV1From(w.eth.ChainDb(), startIndex, maxCount) } // newWork diff --git a/miner/scroll_worker_test.go b/miner/scroll_worker_test.go index e890d9d6def4..db18e8673856 100644 --- a/miner/scroll_worker_test.go +++ b/miner/scroll_worker_test.go @@ -17,12 +17,14 @@ package miner import ( + "fmt" "math" "math/big" "math/rand" "testing" "time" + "github.com/agiledragon/gomonkey/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1237,3 +1239,119 @@ func TestRestartHeadCCC(t *testing.T) { // head should be rechecked by CCC require.NotNil(t, rawdb.ReadBlockRowConsumption(db, headHash)) } + +func newUint64(val uint64) *uint64 { return &val } + +// TestEuclidV2MessageQueue tests L1 messages are correctly processed and included in the block during the +// transition from Euclid to EuclidV2 hard fork. +// - Before EuclidV2 only L1 messages V1 can be included. +// - During the hard fork, we need to ensure that blocks are backdated and all L1 messages V1 are included before the hard fork time. +// - After EuclidV2 only L1 messages V2 can be included. +func TestEuclidV2HardForkMessageQueue(t *testing.T) { + // patch time.Now() to be able to simulate hard fork time + patches := gomonkey.NewPatches() + defer patches.Reset() + + // EuclidV2 hard fork time, leave a big gap so that we can test before and after the hard fork time + euclidV2Time := uint64(10000) + + var timeCount int64 + patches.ApplyFunc(time.Now, func() time.Time { + timeCount++ + return time.Unix(timeCount, 0) + }) + + var ( + engine consensus.Engine + chainConfig *params.ChainConfig + db = rawdb.NewMemoryDatabase() + ) + msgs := []types.L1MessageTx{ + {QueueIndex: 0, Gas: 21016, To: &common.Address{3}, Data: []byte{0x01}, Sender: common.Address{4}}, + {QueueIndex: 1, Gas: 21016, To: &common.Address{1}, Data: []byte{0x01}, Sender: common.Address{2}}, + {QueueIndex: 2, Gas: 21016, To: &common.Address{1}, Data: []byte{0x01}, Sender: common.Address{2}}, + {QueueIndex: 3, Gas: 21016, To: &common.Address{1}, Data: []byte{0x01}, Sender: common.Address{2}}, + {QueueIndex: 4, Gas: 21016, To: &common.Address{1}, Data: []byte{0x01}, Sender: common.Address{2}}, + {QueueIndex: 5, Gas: 21016, To: &common.Address{1}, Data: []byte{0x01}, Sender: common.Address{2}}, + } + rawdb.WriteL1Messages(db, msgs) + rawdb.WriteL1MessageV2StartIndex(db, 4) + + chainConfig = params.AllCliqueProtocolChanges.Clone() + chainConfig.Clique = ¶ms.CliqueConfig{Period: 1, Epoch: 30000} + engine = clique.New(chainConfig.Clique, db) + + chainConfig.Scroll.L1Config = ¶ms.L1Config{ + NumL1MessagesPerBlock: 1, + } + chainConfig.Scroll.FeeVaultAddress = &common.Address{} + chainConfig.Scroll.UseZktrie = false + + chainConfig.EuclidTime = newUint64(0) + chainConfig.EuclidV2Time = newUint64(euclidV2Time) + w, b := newTestWorker(t, chainConfig, engine, db, 0) + defer w.close() + + // This test chain imports the mined blocks. + b.genesis.MustCommit(db) + chain, _ := core.NewBlockChain(db, nil, b.chain.Config(), engine, vm.Config{ + Debug: true, + Tracer: vm.NewStructLogger(&vm.LogConfig{EnableMemory: true, EnableReturnData: true})}, nil, nil) + defer chain.Stop() + + // Wait for mined blocks. + sub := w.mux.Subscribe(core.NewMinedBlockEvent{}) + defer sub.Unsubscribe() + + // Start mining! + w.start() + + var block1Time uint64 + for i := 0; i < 6; i++ { + select { + case ev := <-sub.Chan(): + // After we received the first block, we activate EuclidV2 + if i == 0 { + timeCount = int64(euclidV2Time) + } + + block := ev.Data.(core.NewMinedBlockEvent).Block + fmt.Println("block", block.NumberU64(), block.Time()) + _, err := chain.InsertChain([]*types.Block{block}) + require.NoError(t, err, "failed to insert new mined block %d", block.NumberU64()) + require.Equal(t, 1, len(block.Transactions())) + + queueIndex := block.Transactions()[0].AsL1MessageTx().QueueIndex + require.Equal(t, uint64(i), queueIndex) + + switch i { + case 0: + block1Time = block.Time() + case 1, 2, 3: + // pre EuclidV2, we should include 1 L1 message V1 per block. + // we expect backdated blocks (same time as parent) and all L1 messages V1 to be included before the hard fork time. + + if i == 1 { + require.GreaterOrEqual(t, block.Time(), block1Time, "block %d", block.NumberU64()) + block1Time = block.Time() // due to concurrent mining it might be that block2 is mined before the hard fork time is set + } + // make sure the block is backdated + require.Equal(t, block1Time, block.Time(), "block %d", block.NumberU64()) + // since the block contains L1 message V1 it needs to be included before the hard fork time + require.Less(t, block.Time(), euclidV2Time, "block %d", block.NumberU64()) + case 4, 5: + // after EuclidV2 and when all L1 messages are consumed, we should include 1 L1 message V2 per block + + require.GreaterOrEqual(t, block.Time(), euclidV2Time, "block %d", block.NumberU64()) + } + + // make sure DB is updated correctly + queueIndexNotInDB := rawdb.ReadFirstQueueIndexNotInL2Block(db, block.Hash()) + require.NotNil(t, queueIndexNotInDB) + require.Equal(t, uint64(i+1), *queueIndexNotInDB) + + case <-time.After(3 * time.Second): + t.Fatalf("timeout") + } + } +} diff --git a/node/config.go b/node/config.go index 2dc3207dfdca..6e7133a95129 100644 --- a/node/config.go +++ b/node/config.go @@ -197,6 +197,8 @@ type Config struct { L1Confirmations rpc.BlockNumber `toml:",omitempty"` // L1 bridge deployment block number L1DeploymentBlock uint64 `toml:",omitempty"` + // Explicitly disable L1 message queue V2 and only query from L1 message queue V1 (before EuclidV2) + L1DisableMessageQueueV2 bool `toml:",omitempty"` // Is daSyncingEnabled DaSyncingEnabled bool `toml:",omitempty"` } diff --git a/params/config.go b/params/config.go index 6216d3c83f28..befc38771401 100644 --- a/params/config.go +++ b/params/config.go @@ -687,10 +687,12 @@ type ScrollConfig struct { // L1Config contains the l1 parameters needed to sync l1 contract events (e.g., l1 messages, commit/revert/finalize batches) in the sequencer type L1Config struct { - L1ChainId uint64 `json:"l1ChainId,string,omitempty"` - L1MessageQueueAddress common.Address `json:"l1MessageQueueAddress,omitempty"` - NumL1MessagesPerBlock uint64 `json:"numL1MessagesPerBlock,string,omitempty"` - ScrollChainAddress common.Address `json:"scrollChainAddress,omitempty"` + L1ChainId uint64 `json:"l1ChainId,string,omitempty"` + L1MessageQueueAddress common.Address `json:"l1MessageQueueAddress,omitempty"` + L1MessageQueueV2Address common.Address `json:"l1MessageQueueV2Address,omitempty"` // TODO: set address once known + L1MessageQueueV2DeploymentBlock uint64 `json:"l1MessageQueueV2DeploymentBlock,omitempty"` // TODO: set block number once known + NumL1MessagesPerBlock uint64 `json:"numL1MessagesPerBlock,string,omitempty"` + ScrollChainAddress common.Address `json:"scrollChainAddress,omitempty"` } func (c *L1Config) String() string { diff --git a/params/version.go b/params/version.go index 0612d402f1e8..8f45867a8849 100644 --- a/params/version.go +++ b/params/version.go @@ -24,7 +24,7 @@ import ( const ( VersionMajor = 5 // Major version component of the current release VersionMinor = 8 // Minor version component of the current release - VersionPatch = 7 // Patch version component of the current release + VersionPatch = 8 // Patch version component of the current release VersionMeta = "mainnet" // Version metadata to append to the version string ) diff --git a/rollup/sync_service/bindings.go b/rollup/sync_service/bindings.go index 1c2b1393e91c..84e5b3e7a59e 100644 --- a/rollup/sync_service/bindings.go +++ b/rollup/sync_service/bindings.go @@ -32,7 +32,7 @@ type L1MessageQueueFilterer struct { contract *bind.BoundContract // Generic contract wrapper for the low level calls } -// NewL1MessageQueueFilterer creates a new log filterer instance of L1MessageQueue, bound to a specific deployed contract. +// NewL1MessageQueueFilterer creates a new log filtererV1 instance of L1MessageQueue, bound to a specific deployed contract. func NewL1MessageQueueFilterer(address common.Address, filterer bind.ContractFilterer) (*L1MessageQueueFilterer, error) { contract, err := bindL1MessageQueue(address, nil, nil, filterer) if err != nil { diff --git a/rollup/sync_service/bridge_client.go b/rollup/sync_service/bridge_client.go index 51ae3b02ce3e..b9dc9870e849 100644 --- a/rollup/sync_service/bridge_client.go +++ b/rollup/sync_service/bridge_client.go @@ -16,15 +16,23 @@ import ( // BridgeClient is a wrapper around EthClient that adds // methods for conveniently collecting L1 messages. type BridgeClient struct { - client EthClient - confirmations rpc.BlockNumber - l1MessageQueueAddress common.Address - filterer *L1MessageQueueFilterer + client EthClient + confirmations rpc.BlockNumber + + l1MessageQueueV1Address common.Address + filtererV1 *L1MessageQueueFilterer + + enableMessageQueueV2 bool + l1MessageQueueV2Address common.Address + filtererV2 *L1MessageQueueFilterer } -func newBridgeClient(ctx context.Context, l1Client EthClient, l1ChainId uint64, confirmations rpc.BlockNumber, l1MessageQueueAddress common.Address) (*BridgeClient, error) { - if l1MessageQueueAddress == (common.Address{}) { - return nil, errors.New("must pass non-zero l1MessageQueueAddress to BridgeClient") +func newBridgeClient(ctx context.Context, l1Client EthClient, l1ChainId uint64, confirmations rpc.BlockNumber, l1MessageQueueV1Address common.Address, enableMessageQueueV2 bool, l1MessageQueueV2Address common.Address) (*BridgeClient, error) { + if l1MessageQueueV1Address == (common.Address{}) { + return nil, errors.New("must pass non-zero l1MessageQueueV1Address to BridgeClient") + } + if enableMessageQueueV2 && l1MessageQueueV2Address == (common.Address{}) { + return nil, errors.New("must pass non-zero l1MessageQueueV2Address to BridgeClient") } // sanity check: compare chain IDs @@ -36,16 +44,29 @@ func newBridgeClient(ctx context.Context, l1Client EthClient, l1ChainId uint64, return nil, fmt.Errorf("unexpected chain ID, expected = %v, got = %v", l1ChainId, got) } - filterer, err := NewL1MessageQueueFilterer(l1MessageQueueAddress, l1Client) + filtererV1, err := NewL1MessageQueueFilterer(l1MessageQueueV1Address, l1Client) if err != nil { - return nil, fmt.Errorf("failed to initialize L1MessageQueueFilterer, err = %w", err) + return nil, fmt.Errorf("failed to initialize L1MessageQueueV1Filterer, err = %w", err) + } + + var filtererV2 *L1MessageQueueFilterer + if enableMessageQueueV2 { + filtererV2, err = NewL1MessageQueueFilterer(l1MessageQueueV2Address, l1Client) + if err != nil { + return nil, fmt.Errorf("failed to initialize L1MessageQueueV2Filterer, err = %w", err) + } } client := BridgeClient{ - client: l1Client, - confirmations: confirmations, - l1MessageQueueAddress: l1MessageQueueAddress, - filterer: filterer, + client: l1Client, + confirmations: confirmations, + + l1MessageQueueV1Address: l1MessageQueueV1Address, + filtererV1: filtererV1, + + enableMessageQueueV2: enableMessageQueueV2, + l1MessageQueueV2Address: l1MessageQueueV2Address, + filtererV2: filtererV2, } return &client, nil @@ -53,30 +74,70 @@ func newBridgeClient(ctx context.Context, l1Client EthClient, l1ChainId uint64, // fetchMessagesInRange retrieves and parses all L1 messages between the // provided from and to L1 block numbers (inclusive). -func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64) ([]types.L1MessageTx, error) { +func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64, queryL1MessagesV1 bool) ([]types.L1MessageTx, []types.L1MessageTx, error) { log.Trace("BridgeClient fetchMessagesInRange", "fromBlock", from, "toBlock", to) + var msgsV1, msgsV2 []types.L1MessageTx + opts := bind.FilterOpts{ Start: from, End: &to, Context: ctx, } - it, err := c.filterer.FilterQueueTransaction(&opts, nil, nil) - if err != nil { - return nil, err + + // Query L1MessageQueueV1 if enabled. We disable querying of L1MessageQueueV1 once L1MessageQueueV2 is enabled, + // and we have received the first QueueTransaction event from L1MessageQueueV2. + if queryL1MessagesV1 { + it, err := c.filtererV1.FilterQueueTransaction(&opts, nil, nil) + if err != nil { + return nil, nil, err + } + + for it.Next() { + event := it.Event + log.Trace("Received new L1 QueueTransaction event from L1MessageQueueV1", "event", event) + + if !event.GasLimit.IsUint64() { + return nil, nil, fmt.Errorf("invalid QueueTransaction event: QueueIndex = %v, GasLimit = %v", event.QueueIndex, event.GasLimit) + } + + msgsV1 = append(msgsV1, types.L1MessageTx{ + QueueIndex: event.QueueIndex, + Gas: event.GasLimit.Uint64(), + To: &event.Target, + Value: event.Value, + Data: event.Data, + Sender: event.Sender, + }) + } + + if err = it.Error(); err != nil { + return nil, nil, err + } + } + + // We allow to explicitly enable/disable querying of L1MessageQueueV2. This is useful for running the node without + // MessageQueueV2 available on L1 or for testing purposes. + if !c.enableMessageQueueV2 { + return msgsV1, nil, nil } - var msgs []types.L1MessageTx + // We always query L1MessageQueueV2. Before EuclidV2 L1 upgrade tx this will return an empty list as we don't use + // L1MessageQueueV2 to enqueue L1 messages before EuclidV2. + it, err := c.filtererV2.FilterQueueTransaction(&opts, nil, nil) + if err != nil { + return nil, nil, err + } for it.Next() { event := it.Event - log.Trace("Received new L1 QueueTransaction event", "event", event) + log.Trace("Received new L1 QueueTransaction event from L1MessageQueueV2", "event", event) if !event.GasLimit.IsUint64() { - return nil, fmt.Errorf("invalid QueueTransaction event: QueueIndex = %v, GasLimit = %v", event.QueueIndex, event.GasLimit) + return nil, nil, fmt.Errorf("invalid QueueTransaction event: QueueIndex = %v, GasLimit = %v", event.QueueIndex, event.GasLimit) } - msgs = append(msgs, types.L1MessageTx{ + msgsV2 = append(msgsV2, types.L1MessageTx{ QueueIndex: event.QueueIndex, Gas: event.GasLimit.Uint64(), To: &event.Target, @@ -86,11 +147,11 @@ func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64 }) } - if err := it.Error(); err != nil { - return nil, err + if err = it.Error(); err != nil { + return nil, nil, err } - return msgs, nil + return msgsV1, msgsV2, nil } func (c *BridgeClient) getLatestConfirmedBlockNumber(ctx context.Context) (uint64, error) { diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index 9239e210b8ed..65743b5e7e36 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -65,7 +65,7 @@ func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, node return nil, fmt.Errorf("missing L1 config in genesis") } - client, err := newBridgeClient(ctx, l1Client, genesisConfig.Scroll.L1Config.L1ChainId, nodeConfig.L1Confirmations, genesisConfig.Scroll.L1Config.L1MessageQueueAddress) + client, err := newBridgeClient(ctx, l1Client, genesisConfig.Scroll.L1Config.L1ChainId, nodeConfig.L1Confirmations, genesisConfig.Scroll.L1Config.L1MessageQueueAddress, !nodeConfig.L1DisableMessageQueueV2, genesisConfig.Scroll.L1Config.L1MessageQueueV2Address) if err != nil { return nil, fmt.Errorf("failed to initialize bridge client: %w", err) } @@ -78,6 +78,24 @@ func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, node latestProcessedBlock = *block } + // reset synced height so that previous V2 messages are re-fetched in case a node upgraded after V2 deployment. + // otherwise there's no way for the node to know if it missed any messages of the V2 queue (as it was not querying it before) + // but continued to query the V1 queue (which after V2 deployment does not contain any messages anymore). + // this is a one-time operation and will not be repeated on subsequent restarts. + if genesisConfig.Scroll.L1Config.L1MessageQueueV2DeploymentBlock > 0 && + genesisConfig.Scroll.L1Config.L1MessageQueueV2DeploymentBlock < latestProcessedBlock { // node synced after V2 deployment + + // this means the node has never synced V2 messages before -> we need to reset the synced height to re-fetch V2 messages. + // Resetting the synced height will not cause any inconsistency as V1 messages are only available before V2 deployment block + // and V2 messages are only available after V2 deployment block. -> rawdb.ReadHighestSyncedQueueIndex(s.db) and the next expected index + // will still be consistent. + initialV2L1Block := rawdb.ReadL1MessageV2FirstL1BlockNumber(db) + if initialV2L1Block == nil { + latestProcessedBlock = genesisConfig.Scroll.L1Config.L1MessageQueueV2DeploymentBlock + log.Info("Resetting L1 message sync height to fetch previous V2 messages", "L1 block", latestProcessedBlock) + } + } + ctx, cancel := context.WithCancel(ctx) service := SyncService{ @@ -175,6 +193,9 @@ func (s *SyncService) fetchMessages() { // keep track of next queue index we're expecting to see queueIndex := rawdb.ReadHighestSyncedQueueIndex(s.db) + // read start index of very first L1MessageV2 from database + l1MessageV2StartIndex := rawdb.ReadL1MessageV2StartIndex(s.db) + batchWriter := s.db.NewBatch() numBlocksPendingDbWrite := uint64(0) numMessagesPendingDbWrite := 0 @@ -227,7 +248,8 @@ func (s *SyncService) fetchMessages() { to = latestConfirmed } - msgs, err := s.client.fetchMessagesInRange(s.ctx, from, to) + queryL1MessagesV1 := l1MessageV2StartIndex == nil + msgsV1, msgsV2, err := s.client.fetchMessagesInRange(s.ctx, from, to, queryL1MessagesV1) if err != nil { // flush pending writes to database if from > 0 { @@ -237,6 +259,17 @@ func (s *SyncService) fetchMessages() { return } + // write start index of very first L1MessageV2 to database. This is true only once. + if len(msgsV2) > 0 && l1MessageV2StartIndex == nil { + firstL1MessageV2 := msgsV2[0] + log.Info("Received first L1Message from MessageQueueV2", "queueIndex", firstL1MessageV2.QueueIndex, "L1 blockNumber", to) + l1MessageV2StartIndex = &firstL1MessageV2.QueueIndex + rawdb.WriteL1MessageV2StartIndex(batchWriter, firstL1MessageV2.QueueIndex) + rawdb.WriteL1MessageV2FirstL1BlockNumber(batchWriter, to) + } + + msgs := append(msgsV1, msgsV2...) + if len(msgs) > 0 { log.Debug("Received new L1 events", "fromBlock", from, "toBlock", to, "count", len(msgs)) rawdb.WriteL1Messages(batchWriter, msgs) // collect messages in memory