diff --git a/core/rawdb/rollup_indexes.go b/core/rawdb/rollup_indexes.go index 4ce5532c1..4f275e6af 100644 --- a/core/rawdb/rollup_indexes.go +++ b/core/rawdb/rollup_indexes.go @@ -44,3 +44,22 @@ func WriteHeadQueueIndex(db ethdb.KeyValueWriter, index uint64) { log.Crit("Failed to store queue index", "err", err) } } + +func ReadHeadVerifiedIndex(db ethdb.KeyValueReader) *uint64 { + data, _ := db.Get(headVerifiedIndexKey) + if len(data) == 0 { + return nil + } + ret := new(big.Int).SetBytes(data).Uint64() + return &ret +} + +func WriteHeadVerifiedIndex(db ethdb.KeyValueWriter, index uint64) { + value := new(big.Int).SetUint64(index).Bytes() + if index == 0 { + value = []byte{0} + } + if err := db.Put(headVerifiedIndexKey, value); err != nil { + log.Crit("Failed to store verfied index", "err", err) + } +} diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index ed091fa23..622a67831 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -58,8 +58,10 @@ var ( // headIndexKey tracks the last processed ctc index headIndexKey = []byte("LastIndex") - // headQueueIndexKey tracks th last processed queue index + // headQueueIndexKey tracks the last processed queue index headQueueIndexKey = []byte("LastQueueIndex") + // headVerifiedIndexKey tracks the latest verified index + headVerifiedIndexKey = []byte("LastVerifiedIndex") preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db diff --git a/core/vm/evm.go b/core/vm/evm.go index 32fd66337..4d45a62c0 100644 --- a/core/vm/evm.go +++ b/core/vm/evm.go @@ -66,7 +66,8 @@ func run(evm *EVM, contract *Contract, input []byte, readOnly bool) ([]byte, err } else { log.Debug("Calling Known Contract", "ID", evm.Id, "Name", name, "Method", method.RawName) if method.RawName == "ovmREVERT" { - log.Debug("Contract Threw Exception", "ID", evm.Id, "asciified", string(input)) + hex := hexutil.Encode(input) + log.Debug("Contract Threw Exception", "ID", evm.Id, "asciified", hex) } } } diff --git a/eth/api_backend.go b/eth/api_backend.go index c59269836..14ef24b77 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -70,9 +70,10 @@ func (b *EthAPIBackend) GetEthContext() (uint64, uint64) { return bn, ts } -func (b *EthAPIBackend) GetRollupContext() (uint64, uint64) { +func (b *EthAPIBackend) GetRollupContext() (uint64, uint64, uint64) { i := uint64(0) q := uint64(0) + v := uint64(0) index := b.eth.syncService.GetLatestIndex() if index != nil { i = *index @@ -81,7 +82,11 @@ func (b *EthAPIBackend) GetRollupContext() (uint64, uint64) { if queueIndex != nil { q = *queueIndex } - return i, q + verifiedIndex := b.eth.syncService.GetLatestVerifiedIndex() + if verifiedIndex != nil { + v = *verifiedIndex + } + return i, q, v } // ChainConfig returns the active chain configuration. @@ -312,7 +317,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) return fmt.Errorf("Calldata cannot be larger than %d, sent %d", b.MaxCallDataSize, len(signedTx.Data())) } } - return b.eth.syncService.ApplyTransaction(signedTx) + return b.eth.syncService.ValidateAndApplySequencerTransaction(signedTx) } // OVM Disabled return b.eth.txPool.AddLocal(signedTx) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 6da198359..2f3e765bc 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1832,8 +1832,9 @@ type EthContext struct { Timestamp uint64 `json:"timestamp"` } type RollupContext struct { - Index uint64 `json:"index"` - QueueIndex uint64 `json:"queueIndex"` + Index uint64 `json:"index"` + QueueIndex uint64 `json:"queueIndex"` + VerifiedIndex uint64 `json:"verifiedIndex"` } type rollupInfo struct { @@ -1850,7 +1851,7 @@ func (api *PublicRollupAPI) GetInfo(ctx context.Context) rollupInfo { } syncing := api.b.IsSyncing() bn, ts := api.b.GetEthContext() - index, queueIndex := api.b.GetRollupContext() + index, queueIndex, verifiedIndex := api.b.GetRollupContext() return rollupInfo{ Mode: mode, @@ -1860,8 +1861,9 @@ func (api *PublicRollupAPI) GetInfo(ctx context.Context) rollupInfo { Timestamp: ts, }, RollupContext: RollupContext{ - Index: index, - QueueIndex: queueIndex, + Index: index, + QueueIndex: queueIndex, + VerifiedIndex: verifiedIndex, }, } } diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 0ff8643a2..f4fd9e942 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -91,7 +91,7 @@ type Backend interface { IsVerifier() bool IsSyncing() bool GetEthContext() (uint64, uint64) - GetRollupContext() (uint64, uint64) + GetRollupContext() (uint64, uint64, uint64) GasLimit() uint64 GetDiff(*big.Int) (diffdb.Diff, error) } diff --git a/les/api_backend.go b/les/api_backend.go index d0007cf92..7d260efa4 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -58,8 +58,8 @@ func (b *LesApiBackend) GetEthContext() (uint64, uint64) { return 0, 0 } -func (b *LesApiBackend) GetRollupContext() (uint64, uint64) { - return 0, 0 +func (b *LesApiBackend) GetRollupContext() (uint64, uint64, uint64) { + return 0, 0, 0 } func (b *LesApiBackend) IsSyncing() bool { diff --git a/miner/worker.go b/miner/worker.go index e7a202007..5f210c0d5 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -863,32 +863,14 @@ func (w *worker) commitNewTx(tx *types.Transaction) error { tstart := time.Now() parent := w.chain.CurrentBlock() - // The L1Timestamp will always be set for a transaction - // coming from a batch submission because the transaction - // has been included in the canonical transaction chain. - // The only time that L1Timestamp is zero is for queue - // origin sequencer transactions that have yet to be included - // in the canonical transaction chain, meaning this code - // path is only relevant for the sequencer. - if tx.L1Timestamp() == 0 { - ts := w.eth.SyncService().GetLatestL1Timestamp() - bn := w.eth.SyncService().GetLatestL1BlockNumber() - tx.SetL1Timestamp(ts) - tx.SetL1BlockNumber(bn) - } timestamp := tx.L1Timestamp() + if timestamp < parent.Time() { + log.Error("Monotonicity violation detected", "index", parent.NumberU64()) + } num := parent.Number() - // Fill in the index field in the tx meta if it is `nil`. - // This should only ever happen in the case of the sequencer - // receiving a queue origin sequencer transaction. The verifier - // should always receive transactions with an index as they - // have already been confirmed in the canonical transaction chain. - // Use the parent's block number because the CTC is 0 indexed. if meta := tx.GetMeta(); meta.Index == nil { - index := num.Uint64() - meta.Index = &index - tx.SetTransactionMeta(meta) + log.Error("Index not found on transaction") } header := &types.Header{ ParentHash: parent.Hash(), diff --git a/rollup/client.go b/rollup/client.go index e2d9b4757..d8ca386a0 100644 --- a/rollup/client.go +++ b/rollup/client.go @@ -13,12 +13,6 @@ import ( "github.com/go-resty/resty/v2" ) -/** - * GET /enqueue/index/{index} - * GET /transaction/index/{index} - * GET /eth/context/latest - */ - type Batch struct { Index uint64 `json:"index"` Root common.Hash `json:"root,omitempty"` @@ -86,11 +80,13 @@ type decoded struct { type RollupClient interface { GetEnqueue(index uint64) (*types.Transaction, error) GetLatestEnqueue() (*types.Transaction, error) - GetTransaction(index uint64) (*types.Transaction, error) - GetLatestTransaction() (*types.Transaction, error) - GetEthContext(index uint64) (*EthContext, error) + GetTransaction(uint64, string) (*types.Transaction, error) + GetLatestTransaction(string) (*types.Transaction, error) + GetEthContext(uint64) (*EthContext, error) GetLatestEthContext() (*EthContext, error) GetLastConfirmedEnqueue() (*types.Transaction, error) + GetLatestTransactionBatch() (*Batch, []*types.Transaction, error) + GetTransactionBatch(uint64) (*Batch, []*types.Transaction, error) SyncStatus() (*SyncStatus, error) } @@ -104,6 +100,11 @@ type TransactionResponse struct { Batch *Batch `json:"batch"` } +type TransactionBatchResponse struct { + Batch *Batch `json:"batch"` + Transactions []*transaction `json:"transactions"` +} + func NewClient(url string, chainID *big.Int) *Client { client := resty.New() client.SetHostURL(url) @@ -115,7 +116,6 @@ func NewClient(url string, chainID *big.Int) *Client { } } -// This needs to return a transaction instead func (c *Client) GetEnqueue(index uint64) (*types.Transaction, error) { str := strconv.FormatUint(index, 10) response, err := c.client.R(). @@ -217,45 +217,45 @@ func (c *Client) GetLatestEnqueue() (*types.Transaction, error) { return tx, nil } -func transactionResponseToTransaction(res *TransactionResponse, signer *types.OVMSigner) (*types.Transaction, error) { +func transactionResponseToTransaction(res *transaction, signer *types.OVMSigner) (*types.Transaction, error) { // `nil` transactions are not found - if res.Transaction == nil { + if res == nil { return nil, nil } // The queue origin must be either sequencer of l1, otherwise // it is considered an unknown queue origin and will not be processed var queueOrigin types.QueueOrigin - if res.Transaction.QueueOrigin == "sequencer" { + if res.QueueOrigin == "sequencer" { queueOrigin = types.QueueOriginSequencer - } else if res.Transaction.QueueOrigin == "l1" { + } else if res.QueueOrigin == "l1" { queueOrigin = types.QueueOriginL1ToL2 } else { - return nil, fmt.Errorf("Unknown queue origin: %s", res.Transaction.QueueOrigin) + return nil, fmt.Errorf("Unknown queue origin: %s", res.QueueOrigin) } // The transaction type must be EIP155 or EthSign. Throughout this // codebase, it is referred to as "sighash type" but it could actually // be generalized to transaction type. Right now the only different // types use a different signature hashing scheme. var sighashType types.SignatureHashType - if res.Transaction.Type == "EIP155" { + if res.Type == "EIP155" { sighashType = types.SighashEIP155 - } else if res.Transaction.Type == "ETH_SIGN" { + } else if res.Type == "ETH_SIGN" { sighashType = types.SighashEthSign } else { - return nil, fmt.Errorf("Unknown transaction type: %s", res.Transaction.Type) + return nil, fmt.Errorf("Unknown transaction type: %s", res.Type) } // Transactions that have been decoded are // Queue Origin Sequencer transactions - if res.Transaction.Decoded != nil { - nonce := res.Transaction.Decoded.Nonce - to := res.Transaction.Decoded.Target + if res.Decoded != nil { + nonce := res.Decoded.Nonce + to := res.Decoded.Target value := new(big.Int) // Note: there are two gas limits, one top level and // another on the raw transaction itself. Maybe maxGasLimit // for the top level? - gasLimit := res.Transaction.Decoded.GasLimit - gasPrice := new(big.Int).SetUint64(res.Transaction.Decoded.GasPrice) - data := res.Transaction.Decoded.Data + gasLimit := res.Decoded.GasLimit + gasPrice := new(big.Int).SetUint64(res.Decoded.GasPrice) + data := res.Decoded.Data var tx *types.Transaction if to == (common.Address{}) { @@ -265,22 +265,22 @@ func transactionResponseToTransaction(res *TransactionResponse, signer *types.OV } txMeta := types.NewTransactionMeta( - new(big.Int).SetUint64(res.Transaction.BlockNumber), - res.Transaction.Timestamp, - res.Transaction.Origin, + new(big.Int).SetUint64(res.BlockNumber), + res.Timestamp, + res.Origin, sighashType, queueOrigin, - &res.Transaction.Index, - res.Transaction.QueueIndex, - res.Transaction.Data, + &res.Index, + res.QueueIndex, + res.Data, ) tx.SetTransactionMeta(txMeta) - r, s := res.Transaction.Decoded.Signature.R, res.Transaction.Decoded.Signature.S + r, s := res.Decoded.Signature.R, res.Decoded.Signature.S sig := make([]byte, crypto.SignatureLength) copy(sig[32-len(r):32], r) copy(sig[64-len(s):64], s) - sig[64] = byte(res.Transaction.Decoded.Signature.V) + sig[64] = byte(res.Decoded.Signature.V) tx, err := tx.WithSignature(signer, sig[:]) if err != nil { @@ -293,37 +293,40 @@ func transactionResponseToTransaction(res *TransactionResponse, signer *types.OV // The transaction is either an L1 to L2 transaction or it does not have a // known deserialization nonce := uint64(0) - if res.Transaction.QueueOrigin == "l1" { - if res.Transaction.QueueIndex == nil { + if res.QueueOrigin == "l1" { + if res.QueueIndex == nil { return nil, errors.New("Queue origin L1 to L2 without a queue index") } - nonce = *res.Transaction.QueueIndex + nonce = *res.QueueIndex } - target := res.Transaction.Target - gasLimit := res.Transaction.GasLimit - data := res.Transaction.Data - origin := res.Transaction.Origin + target := res.Target + gasLimit := res.GasLimit + data := res.Data + origin := res.Origin tx := types.NewTransaction(nonce, target, big.NewInt(0), gasLimit, big.NewInt(0), data) txMeta := types.NewTransactionMeta( - new(big.Int).SetUint64(res.Transaction.BlockNumber), - res.Transaction.Timestamp, + new(big.Int).SetUint64(res.BlockNumber), + res.Timestamp, origin, sighashType, queueOrigin, - &res.Transaction.Index, - res.Transaction.QueueIndex, - res.Transaction.Data, + &res.Index, + res.QueueIndex, + res.Data, ) tx.SetTransactionMeta(txMeta) return tx, nil } -func (c *Client) GetTransaction(index uint64) (*types.Transaction, error) { +func (c *Client) GetTransaction(index uint64, backend string) (*types.Transaction, error) { str := strconv.FormatUint(index, 10) response, err := c.client.R(). SetPathParams(map[string]string{ "index": str, }). + SetQueryParams(map[string]string{ + "backend": backend, + }). SetResult(&TransactionResponse{}). Get("/transaction/index/{index}") @@ -332,14 +335,16 @@ func (c *Client) GetTransaction(index uint64) (*types.Transaction, error) { } res, ok := response.Result().(*TransactionResponse) if !ok { - return nil, errors.New("") + return nil, fmt.Errorf("Cannot get transaction %d", index) } - - return transactionResponseToTransaction(res, c.signer) + return transactionResponseToTransaction(res.Transaction, c.signer) } -func (c *Client) GetLatestTransaction() (*types.Transaction, error) { +func (c *Client) GetLatestTransaction(backend string) (*types.Transaction, error) { response, err := c.client.R(). + SetQueryParams(map[string]string{ + "backend": backend, + }). SetResult(&TransactionResponse{}). Get("/transaction/latest") @@ -348,10 +353,10 @@ func (c *Client) GetLatestTransaction() (*types.Transaction, error) { } res, ok := response.Result().(*TransactionResponse) if !ok { - return nil, errors.New("") + return nil, errors.New("Cannot get latest transaction") } - return transactionResponseToTransaction(res, c.signer) + return transactionResponseToTransaction(res.Transaction, c.signer) } func (c *Client) GetEthContext(blockNumber uint64) (*EthContext, error) { @@ -371,7 +376,6 @@ func (c *Client) GetEthContext(blockNumber uint64) (*EthContext, error) { if !ok { return nil, errors.New("Cannot parse EthContext") } - return context, nil } @@ -439,3 +443,53 @@ func (c *Client) SyncStatus() (*SyncStatus, error) { return status, nil } + +func (c *Client) GetLatestTransactionBatch() (*Batch, []*types.Transaction, error) { + response, err := c.client.R(). + SetResult(&TransactionBatchResponse{}). + Get("/batch/transaction/latest") + + if err != nil { + return nil, nil, errors.New("Cannot get latest transaction batch") + } + txBatch, ok := response.Result().(*TransactionBatchResponse) + if !ok { + return nil, nil, fmt.Errorf("Cannot parse transaciton batch response") + } + return parseTransactionBatchResponse(txBatch, c.signer) +} + +func (c *Client) GetTransactionBatch(index uint64) (*Batch, []*types.Transaction, error) { + str := strconv.FormatUint(index, 10) + response, err := c.client.R(). + SetResult(&TransactionBatchResponse{}). + SetPathParams(map[string]string{ + "index": str, + }). + Get("/batch/transaction/index/{index}") + + if err != nil { + return nil, nil, fmt.Errorf("Cannot get transaction batch %d", index) + } + txBatch, ok := response.Result().(*TransactionBatchResponse) + if !ok { + return nil, nil, fmt.Errorf("Cannot parse transaciton batch response") + } + return parseTransactionBatchResponse(txBatch, c.signer) +} + +func parseTransactionBatchResponse(txBatch *TransactionBatchResponse, signer *types.OVMSigner) (*Batch, []*types.Transaction, error) { + if txBatch == nil { + return nil, nil, nil + } + batch := txBatch.Batch + txs := make([]*types.Transaction, len(txBatch.Transactions)) + for i, tx := range txBatch.Transactions { + transaction, err := transactionResponseToTransaction(tx, signer) + if err != nil { + return nil, nil, fmt.Errorf("Cannot parse transaction batch: %w", err) + } + txs[i] = transaction + } + return batch, txs, nil +} diff --git a/rollup/sync_service.go b/rollup/sync_service.go index 5f9f99f5d..b3ede6d01 100644 --- a/rollup/sync_service.go +++ b/rollup/sync_service.go @@ -38,16 +38,20 @@ type SyncService struct { scope event.SubscriptionScope txFeed event.Feed txLock sync.Mutex + loopLock sync.Mutex enable bool eth1ChainId uint64 bc *core.BlockChain txpool *core.TxPool client RollupClient syncing atomic.Value + chainHeadSub event.Subscription OVMContext OVMContext confirmationDepth uint64 pollInterval time.Duration timestampRefreshThreshold time.Duration + chainHeadCh chan core.ChainHeadEvent + backend string } // NewSyncService returns an initialized sync service @@ -76,6 +80,11 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co timestampRefreshThreshold = time.Minute * 15 } + // TODO: parse the backend from a CLI flag + // the backend is used to query from the data transport layer + // transactions from a particular backend + backend := "l1" + // Layer 2 chainid chainID := bc.Config().ChainID if chainID == nil { @@ -93,13 +102,17 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co syncing: atomic.Value{}, bc: bc, txpool: txpool, + chainHeadCh: make(chan core.ChainHeadEvent, 1), eth1ChainId: cfg.Eth1ChainId, client: client, db: db, pollInterval: pollInterval, timestampRefreshThreshold: timestampRefreshThreshold, + backend: backend, } + service.chainHeadSub = service.bc.SubscribeChainHeadEvent(service.chainHeadCh) + // Initial sync service setup if it is enabled. This code depends on // a remote server that indexes the layer one contracts. Place this // code behind this if statement so that this can run without the @@ -139,20 +152,10 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co ts := service.GetLatestL1Timestamp() log.Info("Initialized Latest L1 Info", "blocknumber", bn, "timestamp", ts) - var i, q string index := service.GetLatestIndex() queueIndex := service.GetLatestEnqueueIndex() - if index == nil { - i = "" - } else { - i = strconv.FormatUint(*index, 10) - } - if queueIndex == nil { - q = "" - } else { - q = strconv.FormatUint(*queueIndex, 10) - } - log.Info("Initialized Eth Context", "index", i, "queue-index", q) + verifiedIndex := service.GetLatestVerifiedIndex() + log.Info("Initialized Eth Context", "index", stringify(index), "queue-index", stringify(queueIndex), "verified-index", verifiedIndex) // The sequencer needs to sync to the tip at start up // By setting the sync status to true, it will prevent RPC calls. @@ -161,7 +164,6 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co service.setSyncStatus(true) } } - return &service, nil } @@ -173,31 +175,29 @@ func (s *SyncService) ensureClient() error { return nil } -// Start initializes the service, connecting to Ethereum1 and starting the -// subservices required for the operation of the SyncService. -// txs through syncservice go to mempool.locals -// txs through rpc go to mempool.remote +// Start initializes the service func (s *SyncService) Start() error { if !s.enable { + log.Info("Sync Service not initialized") return nil } log.Info("Initializing Sync Service", "eth1-chainid", s.eth1ChainId) - // When a sequencer, be sure to sync to the tip of the ctc before allowing - // user transactions. - if !s.verifier { - err := s.syncTransactionsToTip() + if s.verifier { + go s.VerifierLoop() + } else { + // The sequencer must sync the transactions to the tip and the + // pending queue transactions on start before setting sync status + // to false and opening up the RPC to accept transactions. + err := s.syncTransactionsToTip(s.backend) if err != nil { return fmt.Errorf("Cannot sync transactions to the tip: %w", err) } - // TODO: This should also sync the enqueue'd transactions that have not - // been synced yet + err = s.syncQueueToTip() + if err != nil { + log.Error("Sequencer cannot sync queue", "msg", err) + } s.setSyncStatus(false) - } - - if s.verifier { - go s.VerifierLoop() - } else { go s.SequencerLoop() } return nil @@ -241,22 +241,19 @@ func (s *SyncService) initializeLatestL1(ctcDeployHeight *big.Int) error { s.SetLatestL1Timestamp(tx.L1Timestamp()) s.SetLatestL1BlockNumber(tx.L1BlockNumber().Uint64()) } - // Only the sequencer cares about latest queue index - if !s.verifier { - queueIndex := s.GetLatestEnqueueIndex() - if queueIndex == nil { - enqueue, err := s.client.GetLastConfirmedEnqueue() - if err != nil { - return fmt.Errorf("Cannot fetch last confirmed queue tx: %w", err) - } - // There are no enqueues yet - if enqueue == nil { - return nil - } - queueIndex = enqueue.GetMeta().QueueIndex + queueIndex := s.GetLatestEnqueueIndex() + if queueIndex == nil { + enqueue, err := s.client.GetLastConfirmedEnqueue() + if err != nil { + return fmt.Errorf("Cannot fetch last confirmed queue tx: %w", err) + } + // There are no enqueues yet + if enqueue == nil { + return nil } - s.SetLatestEnqueueIndex(queueIndex) + queueIndex = enqueue.GetMeta().QueueIndex } + s.SetLatestEnqueueIndex(queueIndex) return nil } @@ -282,6 +279,8 @@ func (s *SyncService) IsSyncing() bool { // Stop will close the open channels and cancel the goroutines // started by this service. func (s *SyncService) Stop() error { + s.chainHeadSub.Unsubscribe() + close(s.chainHeadCh) s.scope.Close() if s.cancel != nil { @@ -290,223 +289,6 @@ func (s *SyncService) Stop() error { return nil } -func (s *SyncService) VerifierLoop() { - log.Info("Starting Verifier Loop", "poll-interval", s.pollInterval, "timestamp-refresh-threshold", s.timestampRefreshThreshold) - for { - // The verifier polls for ctc transactions. - // the ctc transactions are extending the chain. - latest, err := s.client.GetLatestTransaction() - if err != nil { - log.Error("Cannot fetch transaction") - continue - } - - if latest == nil { - time.Sleep(s.pollInterval) - continue - } - - var start uint64 - if s.GetLatestIndex() == nil { - start = 0 - } else { - start = *s.GetLatestIndex() + 1 - } - end := *latest.GetMeta().Index - log.Info("Polling transactions", "start", start, "end", end) - for i := start; i <= end; i++ { - tx, err := s.client.GetTransaction(i) - if err != nil { - log.Error("Cannot get tx in loop", "index", i) - continue - } - log.Debug("Applying transaction", "index", i) - err = s.maybeApplyTransaction(tx) - if err != nil { - log.Error("Cannot apply transaction", "msg", err) - } - s.SetLatestIndex(&i) - } - time.Sleep(s.pollInterval) - } -} - -func (s *SyncService) SequencerLoop() { - log.Info("Starting Sequencer Loop", "poll-interval", s.pollInterval, "timestamp-refresh-threshold", s.timestampRefreshThreshold) - for { - // Only the sequencer needs to poll for enqueue transactions - // and then can choose when to apply them. We choose to apply - // transactions such that it makes for efficient batch submitting. - // Place as many L1ToL2 transactions in the same context as possible - // by executing them one after another. - // TODO: break this routine out into a function so that lock - // management is more simple. For now, be sure to unlock before - // each outer continue - s.txLock.Lock() - latest, err := s.client.GetLatestEnqueue() - if err != nil { - log.Error("Cannot get latest enqueue") - s.txLock.Unlock() - time.Sleep(s.pollInterval) - continue - } - // This should never happen unless the backend is empty - if latest == nil { - log.Debug("No enqueue transactions found") - s.txLock.Unlock() - time.Sleep(s.pollInterval) - continue - } - // Compare the remote latest queue index to the local latest - // queue index. If the remote latest queue index is greater - // than the local latest queue index, be sure to ingest more - // enqueued transactions - var start uint64 - if s.GetLatestEnqueueIndex() == nil { - start = 0 - } else { - start = *s.GetLatestEnqueueIndex() + 1 - } - end := *latest.GetMeta().QueueIndex - - log.Info("Polling enqueued transactions", "start", start, "end", end) - for i := start; i <= end; i++ { - enqueue, err := s.client.GetEnqueue(i) - if err != nil { - log.Error("Cannot get enqueue in loop", "index", i, "message", err) - continue - } - - if enqueue == nil { - log.Debug("No enqueue transaction found") - break - } - - // This should never happen - if enqueue.L1BlockNumber() == nil { - log.Error("No blocknumber for enqueue", "index", i, "timestamp", enqueue.L1Timestamp(), "blocknumber", enqueue.L1BlockNumber()) - continue - } - - // Update the timestamp and blocknumber based on the enqueued - // transactions - if enqueue.L1Timestamp() > s.GetLatestL1Timestamp() { - ts := enqueue.L1Timestamp() - bn := enqueue.L1BlockNumber().Uint64() - s.SetLatestL1Timestamp(ts) - s.SetLatestL1BlockNumber(bn) - log.Info("Updated Eth Context from enqueue", "index", i, "timestamp", ts, "blocknumber", bn) - } - - log.Debug("Applying enqueue transaction", "index", i) - err = s.applyTransaction(enqueue) - if err != nil { - log.Error("Cannot apply transaction", "msg", err) - } - - s.SetLatestEnqueueIndex(enqueue.GetMeta().QueueIndex) - if enqueue.GetMeta().Index == nil { - latest := s.GetLatestIndex() - index := uint64(0) - if latest != nil { - index = *latest + 1 - } - s.SetLatestIndex(&index) - } else { - s.SetLatestIndex(enqueue.GetMeta().Index) - } - } - s.txLock.Unlock() - - // Update the execution context's timestamp and blocknumber - // over time. This is only necessary for the sequencer. - context, err := s.client.GetLatestEthContext() - if err != nil { - log.Error("Cannot get latest eth context", "msg", err) - continue - } - current := time.Unix(int64(s.GetLatestL1Timestamp()), 0) - next := time.Unix(int64(context.Timestamp), 0) - if next.Sub(current) > s.timestampRefreshThreshold { - log.Info("Updating Eth Context", "timetamp", context.Timestamp, "blocknumber", context.BlockNumber) - s.SetLatestL1BlockNumber(context.BlockNumber) - s.SetLatestL1Timestamp(context.Timestamp) - } - time.Sleep(s.pollInterval) - } -} - -// This function must sync all the way to the tip -// TODO: it should then sync all of the enqueue transactions -func (s *SyncService) syncTransactionsToTip() error { - // Then set up a while loop that only breaks when the latest - // transaction does not change through two runs of the loop. - // The latest transaction can change during the timeframe of - // all of the transactions being sync'd. - for { - // This function must be sure to sync all the way to the tip. - // First query the latest transaction - latest, err := s.client.GetLatestTransaction() - if err != nil { - log.Error("Cannot get latest transaction", "msg", err) - time.Sleep(time.Second * 2) - continue - } - if latest == nil { - log.Info("No transactions to sync") - return nil - } - tipHeight := latest.GetMeta().Index - index := rawdb.ReadHeadIndex(s.db) - start := uint64(0) - if index != nil { - start = *index + 1 - } - - log.Info("Syncing transactions to tip", "start", start, "end", *tipHeight) - for i := start; i <= *tipHeight; i++ { - tx, err := s.client.GetTransaction(i) - if err != nil { - log.Error("Cannot get transaction", "index", i, "msg", err) - time.Sleep(time.Second * 2) - continue - } - // The transaction does not yet exist in the ctc - if tx == nil { - index := latest.GetMeta().Index - if index == nil { - return fmt.Errorf("Unexpected nil index") - } - return fmt.Errorf("Transaction %d not found when %d is latest", i, *index) - } - err = s.maybeApplyTransaction(tx) - if err != nil { - return fmt.Errorf("Cannot apply transaction: %w", err) - } - if err != nil { - log.Error("Cannot ingest transaction", "index", i) - } - s.SetLatestIndex(tx.GetMeta().Index) - if types.QueueOrigin(tx.QueueOrigin().Uint64()) == types.QueueOriginL1ToL2 { - queueIndex := tx.GetMeta().QueueIndex - s.SetLatestEnqueueIndex(queueIndex) - } - } - // Be sure to check that no transactions came in while - // the above loop was running - post, err := s.client.GetLatestTransaction() - if err != nil { - return fmt.Errorf("Cannot get latest transaction: %w", err) - } - // These transactions should always have an index since they - // are already in the ctc. - if *latest.GetMeta().Index == *post.GetMeta().Index { - log.Info("Done syncing transactions to tip") - return nil - } - } -} - // Methods for safely accessing and storing the latest // L1 blocknumber and timestamp. These are held in memory. func (s *SyncService) GetLatestL1Timestamp() uint64 { @@ -529,6 +311,14 @@ func (s *SyncService) GetLatestEnqueueIndex() *uint64 { return rawdb.ReadHeadQueueIndex(s.db) } +func (s *SyncService) GetNextEnqueueIndex() uint64 { + latest := s.GetLatestEnqueueIndex() + if latest == nil { + return 0 + } + return *latest + 1 +} + func (s *SyncService) SetLatestEnqueueIndex(index *uint64) { if index != nil { rawdb.WriteHeadQueueIndex(s.db, *index) @@ -541,97 +331,168 @@ func (s *SyncService) SetLatestIndex(index *uint64) { } } +func (s *SyncService) SetLatestVerifiedIndex(index *uint64) { + if index != nil { + rawdb.WriteHeadVerifiedIndex(s.db, *index) + } +} + +func (s *SyncService) GetLatestVerifiedIndex() *uint64 { + return rawdb.ReadHeadVerifiedIndex(s.db) +} + +func (s *SyncService) GetNextVerifiedIndex() uint64 { + index := s.GetLatestVerifiedIndex() + if index == nil { + return 0 + } + return *index + 1 +} + func (s *SyncService) GetLatestIndex() *uint64 { return rawdb.ReadHeadIndex(s.db) } -// reorganize will reorganize to directly to the index passed in. -// The caller must handle the offset relative to the ctc. -func (s *SyncService) reorganize(index uint64) error { - if index == 0 { - return nil - } - err := s.bc.SetHead(index) - if err != nil { - return fmt.Errorf("Cannot reorganize in syncservice: %w", err) +func (s *SyncService) GetNextIndex() uint64 { + latest := s.GetLatestIndex() + if latest == nil { + return 0 } + return *latest + 1 +} - // TODO: make sure no off by one error here - s.SetLatestIndex(&index) - - // When in sequencer mode, be sure to roll back the latest queue - // index as well. - if !s.verifier { - enqueue, err := s.client.GetLastConfirmedEnqueue() - if err != nil { - return fmt.Errorf("cannot reorganize: %w", err) - } - s.SetLatestEnqueueIndex(enqueue.GetMeta().QueueIndex) +func (s *SyncService) applyTransaction(tx *types.Transaction) error { + if tx.GetMeta().Index != nil { + return s.applyIndexedTransaction(tx) } - log.Info("Reorganizing", "height", index) - return nil + return s.applyTransactionToTip(tx) } -// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and -// starts sending event to the given channel. -func (s *SyncService) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { - return s.scope.Track(s.txFeed.Subscribe(ch)) +// applyIndexedTransaction applys an indexed transaction +func (s *SyncService) applyIndexedTransaction(tx *types.Transaction) error { + if tx == nil { + return errors.New("Transaction is nil in applyIndexedTransaction") + } + index := tx.GetMeta().Index + if index == nil { + return errors.New("No index found in applyIndexedTransaction") + } + next := s.GetNextIndex() + if *index == next { + return s.applyTransactionToTip(tx) + } + if *index < next { + return s.applyHistoricalTransaction(tx) + } + return fmt.Errorf("Received tx at index %d when looking for %d", *index, next) } -// maybeApplyTransaction will potentially apply the transaction after first -// inspecting the local database. This is mean to prevent transactions from -// being replayed. -func (s *SyncService) maybeApplyTransaction(tx *types.Transaction) error { - log.Debug("Maybe applying transaction", "hash", tx.Hash().Hex()) +func (s *SyncService) applyHistoricalTransaction(tx *types.Transaction) error { + if tx == nil { + return errors.New("Transaction is nil in applyHistoricalTransaction") + } index := tx.GetMeta().Index if index == nil { - return fmt.Errorf("nil index in maybeApplyTransaction") + return errors.New("No index is found in applyHistoricalTransaction") } - // Handle off by one + // Handle the off by one block := s.bc.GetBlockByNumber(*index + 1) - - // The transaction has yet to be played, so it is safe to apply if block == nil { - err := s.applyTransaction(tx) - if err != nil { - return fmt.Errorf("Maybe apply transaction failed on index %d: %w", *index, err) - } - return nil + return fmt.Errorf("Block %d is not found", *index+1) } - // There is already a transaction at that index, so check - // for its equality. txs := block.Transactions() if len(txs) != 1 { - log.Info("block", "txs", len(txs), "number", block.Number().Uint64()) - return fmt.Errorf("More than 1 transaction in block") + return fmt.Errorf("More than one transaction found in block %d", *index+1) } - if isCtcTxEqual(tx, txs[0]) { - log.Info("Matching transaction found", "index", *index) - } else { - log.Warn("Non matching transaction found", "index", *index) + if !isCtcTxEqual(tx, txs[0]) { + err := s.bc.SetHead(*index) + if err != nil { + return fmt.Errorf("Cannot reorganize in syncservice: %w", err) + } + return s.applyTransactionToTip(tx) } return nil } -// Lower level API used to apply a transaction, must only be used with -// transactions that came from L1. -func (s *SyncService) applyTransaction(tx *types.Transaction) error { +func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error { + s.txLock.Lock() + defer s.txLock.Unlock() + if tx.L1Timestamp() == 0 { + ts := s.GetLatestL1Timestamp() + bn := s.GetLatestL1BlockNumber() + tx.SetL1Timestamp(ts) + tx.SetL1BlockNumber(bn) + } else if tx.L1Timestamp() > s.GetLatestL1Timestamp() { + ts := tx.L1Timestamp() + bn := tx.L1BlockNumber() + s.SetLatestL1Timestamp(ts) + s.SetLatestL1BlockNumber(bn.Uint64()) + } else if tx.L1Timestamp() < s.GetLatestL1Timestamp() { + log.Warn("Timestamp monotonicity violation") + } + + if tx.GetMeta().Index == nil { + index := s.GetLatestEnqueueIndex() + if index == nil { + tx.SetIndex(0) + } else { + tx.SetIndex(*index + 1) + } + } + s.SetLatestIndex(tx.GetMeta().Index) + if tx.GetMeta().QueueIndex != nil { + s.SetLatestEnqueueIndex(tx.GetMeta().QueueIndex) + } + tx = fixType(tx) txs := types.Transactions{tx} s.txFeed.Send(core.NewTxsEvent{Txs: txs}) + // Block until the transaction has been added to the chain + <-s.chainHeadCh + + return nil +} + +func (s *SyncService) applyBatchedTransaction(tx *types.Transaction) error { + if tx == nil { + return errors.New("nil transaction passed into applyBatchedTransaction") + } + index := tx.GetMeta().Index + if index == nil { + return errors.New("No index found on transaction") + } + err := s.applyIndexedTransaction(tx) + if err != nil { + return fmt.Errorf("Cannot apply batched transaction: %w", err) + } + s.SetLatestVerifiedIndex(index) return nil } // Higher level API for applying transactions. Should only be called for // queue origin sequencer transactions, as the contracts on L1 manage the same // validity checks that are done here. -func (s *SyncService) ApplyTransaction(tx *types.Transaction) error { - log.Debug("Sending transaction to sync service", "hash", tx.Hash().Hex()) - s.txLock.Lock() - defer s.txLock.Unlock() +func (s *SyncService) ValidateAndApplySequencerTransaction(tx *types.Transaction) error { if s.verifier { return errors.New("Verifier does not accept transactions out of band") } + if tx == nil { + return errors.New("nil transaction passed to ValidateAndApplySequencerTransaction") + } + + s.txLock.Lock() + defer s.txLock.Unlock() + + // On the first transaction received by a sequencer, switch the backend + if s.backend == "l2" { + log.Info("Sequencer syncing final transactions to tip") + s.syncTransactionsToTip("l2") + s.backend = "l1" + log.Info("Sequencing switch staring", "next-index", s.GetNextIndex()) + } + + log.Debug("Sequencer transaction validation", "hash", tx.Hash().Hex()) + qo := tx.QueueOrigin() if qo == nil { return errors.New("invalid transaction with no queue origin") @@ -661,10 +522,212 @@ func (s *SyncService) ApplyTransaction(tx *types.Transaction) error { txRaw, ) tx.SetTransactionMeta(newMeta) - return s.applyTransaction(tx) } +// syncTransactionsToTip will sync all of the transactions to the tip +func (s *SyncService) syncTransactionsToTip(backend string) error { + s.loopLock.Lock() + defer s.loopLock.Unlock() + + for { + latest, err := s.client.GetLatestTransaction(backend) + if err != nil { + return fmt.Errorf("Cannot get latest transaction: %w", err) + } + if latest == nil { + log.Info("No transactions to sync") + return nil + } + latestIndex := latest.GetMeta().Index + if latestIndex == nil { + return errors.New("Latest index is nil") + } + nextIndex := s.GetNextIndex() + log.Info("Syncing transactions to tip", "start", *latestIndex, "end", nextIndex) + + for i := nextIndex; i <= *latestIndex; i++ { + tx, err := s.client.GetTransaction(i, backend) + if err != nil { + log.Error("Cannot get latest transaction", "msg", err) + time.Sleep(time.Second * 2) + continue + } + if tx == nil { + return fmt.Errorf("Transaction %d is nil", i) + } + err = s.applyTransaction(tx) + if err != nil { + return fmt.Errorf("Cannot apply transaction: %w", err) + } + } + + post, err := s.client.GetLatestTransaction(backend) + if err != nil { + return fmt.Errorf("Cannot get latest transaction: %w", err) + } + postLatestIndex := post.GetMeta().Index + if postLatestIndex == nil { + return errors.New("Latest index is nil") + } + if *postLatestIndex == *latestIndex { + return nil + } + } +} + +func (s *SyncService) syncTransactionBatchesToTip() error { + s.loopLock.Lock() + defer s.loopLock.Unlock() + + for { + latest, _, err := s.client.GetLatestTransactionBatch() + if err != nil { + return fmt.Errorf("Cannot get latest transaction batch: %w", err) + } + if latest == nil { + log.Info("No transaction batches to sync") + return nil + } + latestIndex := latest.Index + nextIndex := s.GetNextVerifiedIndex() + + for i := nextIndex; i <= latestIndex; i++ { + _, txs, err := s.client.GetTransactionBatch(i) + if err != nil { + return fmt.Errorf("Cannot get transaction batch: %w", err) + } + for _, tx := range txs { + s.applyBatchedTransaction(tx) + } + } + post, _, err := s.client.GetLatestTransactionBatch() + if err != nil { + return fmt.Errorf("Cannot get latest transaction batch: %w", err) + } + if post.Index == latest.Index { + return nil + } + } +} + +func (s *SyncService) syncQueueToTip() error { + s.loopLock.Lock() + defer s.loopLock.Unlock() + + for { + latest, err := s.client.GetLatestEnqueue() + if err != nil { + return fmt.Errorf("Cannot get latest enqueue transaction: %w", err) + } + if latest == nil { + log.Info("No enqueue transactions to sync") + return nil + } + latestIndex := latest.GetMeta().QueueIndex + if latestIndex == nil { + return errors.New("Latest queue transaction has no queue index") + } + nextIndex := s.GetNextEnqueueIndex() + // TODO: make sure these indices are correct + log.Info("Syncing enqueue transactions to tip", "start", *latestIndex, "end", nextIndex) + + for i := nextIndex; i <= *latestIndex; i++ { + tx, err := s.client.GetEnqueue(i) + if err != nil { + return fmt.Errorf("Canot get enqueue transaction; %w", err) + } + if tx == nil { + return fmt.Errorf("Cannot get queue tx at index %d", i) + } + err = s.applyTransaction(tx) + if err != nil { + return fmt.Errorf("Cannot apply transaction: %w", err) + } + } + post, err := s.client.GetLatestEnqueue() + if err != nil { + return fmt.Errorf("Cannot get latest transaction: %w", err) + } + postLatestIndex := post.GetMeta().QueueIndex + if postLatestIndex == nil { + return errors.New("Latest queue index is nil") + } + if *latestIndex == *postLatestIndex { + return nil + } + } +} + +// Update the execution context's timestamp and blocknumber +// over time. This is only necessary for the sequencer. +func (s *SyncService) updateEthContext() error { + context, err := s.client.GetLatestEthContext() + if err != nil { + return fmt.Errorf("Cannot get eth context: %w", err) + } + current := time.Unix(int64(s.GetLatestL1Timestamp()), 0) + next := time.Unix(int64(context.Timestamp), 0) + if next.Sub(current) > s.timestampRefreshThreshold { + log.Info("Updating Eth Context", "timetamp", context.Timestamp, "blocknumber", context.BlockNumber) + s.SetLatestL1BlockNumber(context.BlockNumber) + s.SetLatestL1Timestamp(context.Timestamp) + } + return nil +} + +func (s *SyncService) VerifierLoop() { + log.Info("Starting Verifier Loop", "poll-interval", s.pollInterval, "timestamp-refresh-threshold", s.timestampRefreshThreshold) + for { + switch s.backend { + case "l1": + err := s.syncTransactionBatchesToTip() + if err != nil { + log.Error("Verifier cannot sync transaction batches", "msg", err) + } + case "l2": + err := s.syncTransactionsToTip("l2") + if err != nil { + log.Error("Verifier cannot sync transactions", "msg", err) + } + } + time.Sleep(s.pollInterval) + } +} + +func (s *SyncService) SequencerLoop() { + log.Info("Starting Sequencer Loop", "poll-interval", s.pollInterval, "timestamp-refresh-threshold", s.timestampRefreshThreshold) + for { + switch s.backend { + case "l1": + err := s.syncQueueToTip() + if err != nil { + log.Error("Sequencer cannot sync queue", "msg", err) + } + err = s.syncTransactionBatchesToTip() + if err != nil { + log.Error("Sequencer cannot sync transaction batches", "msg", err) + } + case "l2": + err := s.syncTransactionsToTip("l2") + if err != nil { + log.Error("Sequencer cannot sync", "msg", err) + } + } + err := s.updateEthContext() + if err != nil { + log.Error("Sequencer cannot update eth context", "msg", err) + } + time.Sleep(s.pollInterval) + } +} + +// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and +// starts sending event to the given channel. +func (s *SyncService) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { + return s.scope.Track(s.txFeed.Subscribe(ch)) +} + func getRawTransaction(tx *types.Transaction) ([]byte, error) { if tx == nil { return nil, errors.New("Cannot process nil transaction") @@ -754,3 +817,10 @@ func fixType(tx *types.Transaction) *types.Transaction { tx.SetTransactionMeta(fixed) return tx } + +func stringify(i *uint64) string { + if i == nil { + return "" + } + return strconv.FormatUint(*i, 10) +}