diff --git a/Cargo.lock b/Cargo.lock index 48d8e63d..74577db3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12799,10 +12799,12 @@ dependencies = [ "reqwest", "reth-chainspec", "reth-cli-util", + "reth-db", "reth-e2e-test-utils", "reth-engine-local", "reth-eth-wire-types", "reth-evm", + "reth-fs-util", "reth-network", "reth-network-api", "reth-network-p2p", @@ -12983,6 +12985,7 @@ dependencies = [ "reth-scroll-engine-primitives", "reth-scroll-node", "reth-scroll-primitives", + "reth-tasks", "reth-tracing", "rollup-node", "rollup-node-chain-orchestrator", @@ -13044,6 +13047,7 @@ dependencies = [ "metrics", "metrics-derive", "rand 0.9.2", + "reth-tasks", "reth-tracing", "rollup-node-primitives", "rollup-node-providers", diff --git a/Cargo.toml b/Cargo.toml index 9dbe1550..2a6b0706 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,9 +152,11 @@ scroll-alloy-rpc-types-engine = { git = "https://github.com/scroll-tech/reth.git # reth reth-chainspec = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3", default-features = false } +reth-db = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3", default-features = false } reth-e2e-test-utils = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3" } reth-eth-wire = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3", default-features = false } reth-eth-wire-types = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3", default-features = false } +reth-fs-util = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3", default-features = false } reth-network = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3", default-features = false } reth-network-api = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3", default-features = false } reth-network-p2p = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3", default-features = false } diff --git a/crates/chain-orchestrator/src/error.rs b/crates/chain-orchestrator/src/error.rs index 504daaba..043a9f33 100644 --- a/crates/chain-orchestrator/src/error.rs +++ b/crates/chain-orchestrator/src/error.rs @@ -92,6 +92,10 @@ pub enum ChainOrchestratorError { /// An error occurred while handling rollup node primitives. #[error("An error occurred while handling rollup node primitives: {0}")] RollupNodePrimitiveError(rollup_node_primitives::RollupNodePrimitiveError), + /// Shutdown requested - this is not a real error but used to signal graceful shutdown. + #[cfg(feature = "test-utils")] + #[error("Shutdown requested")] + Shutdown, } impl CanRetry for ChainOrchestratorError { diff --git a/crates/chain-orchestrator/src/handle/command.rs b/crates/chain-orchestrator/src/handle/command.rs index 03ed97ac..915c7120 100644 --- a/crates/chain-orchestrator/src/handle/command.rs +++ b/crates/chain-orchestrator/src/handle/command.rs @@ -33,6 +33,9 @@ pub enum ChainOrchestratorCommand>), + /// Request the `ChainOrchestrator` to shutdown immediately. + #[cfg(feature = "test-utils")] + Shutdown(oneshot::Sender<()>), } /// The database queries that can be sent to the rollup manager. diff --git a/crates/chain-orchestrator/src/handle/mod.rs b/crates/chain-orchestrator/src/handle/mod.rs index b62ee195..fa3e44ce 100644 --- a/crates/chain-orchestrator/src/handle/mod.rs +++ b/crates/chain-orchestrator/src/handle/mod.rs @@ -120,4 +120,13 @@ impl> ChainOrchestratorHand self.send_command(ChainOrchestratorCommand::DatabaseHandle(tx)); rx.await } + + /// Sends a command to shutdown the `ChainOrchestrator` immediately. + /// This will cause the `ChainOrchestrator`'s event loop to exit gracefully. + #[cfg(feature = "test-utils")] + pub async fn shutdown(&self) -> Result<(), oneshot::error::RecvError> { + let (tx, rx) = oneshot::channel(); + self.send_command(ChainOrchestratorCommand::Shutdown(tx)); + rx.await + } } diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 01967d88..a119d013 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -192,8 +192,16 @@ impl< break; } Some(command) = self.handle_rx.recv() => { - if let Err(err) = self.handle_command(command).await { - tracing::error!(target: "scroll::chain_orchestrator", ?err, "Error handling command"); + match self.handle_command(command).await { + #[cfg(feature = "test-utils")] + Err(ChainOrchestratorError::Shutdown) => { + tracing::info!(target: "scroll::chain_orchestrator", "Shutdown requested, exiting gracefully"); + break; + } + Err(err) => { + tracing::error!(target: "scroll::chain_orchestrator", ?err, "Error handling command"); + } + Ok(_) => {} } } Some(event) = async { @@ -410,6 +418,13 @@ impl< ChainOrchestratorCommand::DatabaseHandle(tx) => { let _ = tx.send(self.database.clone()); } + #[cfg(feature = "test-utils")] + ChainOrchestratorCommand::Shutdown(tx) => { + tracing::info!(target: "scroll::chain_orchestrator", "Received shutdown command, exiting event loop"); + let _ = tx.send(()); + // Return an error to signal shutdown + return Err(ChainOrchestratorError::Shutdown); + } } Ok(()) diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 77f8b399..b03941ec 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -81,8 +81,10 @@ anvil = { git = "https://github.com/foundry-rs/foundry.git", rev = "2c84e1c970d1 alloy-rpc-types-eth = { workspace = true, optional = true } alloy-rpc-types-engine = { workspace = true, optional = true } alloy-rpc-types-anvil = { workspace = true, optional = true } +reth-db = { workspace = true, optional = true, features = ["test-utils"] } reth-e2e-test-utils = { workspace = true, optional = true } reth-engine-local = { workspace = true, optional = true } +reth-fs-util = { workspace = true, optional = true } reth-provider = { workspace = true, optional = true } reth-rpc-layer = { workspace = true, optional = true } reth-rpc-server-types = { workspace = true, optional = true } @@ -112,6 +114,7 @@ console-subscriber = "0.5.0" alloy-chains.workspace = true alloy-eips.workspace = true futures.workspace = true +reth-db = { workspace = true, features = ["test-utils"] } reth-e2e-test-utils.workspace = true reth-node-core.workspace = true reth-provider.workspace = true @@ -132,7 +135,9 @@ alloy-rpc-types-eth = { workspace = true } [features] js-tracer = ["reth-scroll-node/js-tracer", "reth-scroll-rpc/js-tracer"] test-utils = [ + "reth-db", "reth-engine-local", + "reth-fs-util", "reth-trie-db/test-utils", "reth-chainspec/test-utils", "reth-evm/test-utils", diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index 8c359d77..f2b7d1e9 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -147,6 +147,7 @@ impl ScrollRollupNodeConfig { ) -> eyre::Result<()> { // Instantiate the database let db_path = node_config.datadir().db(); + let database_path = if let Some(database_path) = &self.database_args.rn_db_path { database_path.to_string_lossy().to_string() } else { diff --git a/crates/node/src/test_utils/block_builder.rs b/crates/node/src/test_utils/block_builder.rs index 38a42097..8b9a8893 100644 --- a/crates/node/src/test_utils/block_builder.rs +++ b/crates/node/src/test_utils/block_builder.rs @@ -92,7 +92,8 @@ impl<'a> BlockBuilder<'a> { /// Build the block and validate against expectations. pub async fn build_and_await_block(self) -> eyre::Result { - let sequencer_node = &self.fixture.nodes[0]; + let sequencer_node = + self.fixture.nodes[0].as_ref().expect("sequencer node has been shutdown"); // Get the sequencer from the rollup manager handle let handle = &sequencer_node.rollup_manager_handle; diff --git a/crates/node/src/test_utils/database.rs b/crates/node/src/test_utils/database.rs index 4210aeb1..41688327 100644 --- a/crates/node/src/test_utils/database.rs +++ b/crates/node/src/test_utils/database.rs @@ -19,7 +19,10 @@ impl<'a> DatabaseHelper<'a> { /// Get the database for this node. async fn database(&self) -> eyre::Result> { - Ok(self.fixture.nodes[self.node_index].rollup_manager_handle.get_database_handle().await?) + let node = self.fixture.nodes[self.node_index] + .as_ref() + .ok_or_else(|| eyre::eyre!("Node at index {} has been shutdown", self.node_index))?; + Ok(node.rollup_manager_handle.get_database_handle().await?) } /// Get the finalized block number of a batch by its index. diff --git a/crates/node/src/test_utils/event_utils.rs b/crates/node/src/test_utils/event_utils.rs index 2e1a5519..43a53577 100644 --- a/crates/node/src/test_utils/event_utils.rs +++ b/crates/node/src/test_utils/event_utils.rs @@ -215,7 +215,10 @@ impl<'a> EventWaiter<'a> { ) -> eyre::Result> { let mut matched_events = Vec::new(); for node in self.node_indices { - let events = &mut self.fixture.nodes[node].chain_orchestrator_rx; + let Some(node_handle) = &mut self.fixture.nodes[node] else { + continue; // Skip shutdown nodes + }; + let events = &mut node_handle.chain_orchestrator_rx; let mut node_matched_events = Vec::new(); let result = timeout(self.timeout_duration, async { @@ -282,7 +285,10 @@ impl<'a> EventWaiter<'a> { continue; } - let events = &mut self.fixture.nodes[node_index].chain_orchestrator_rx; + let node_handle = self.fixture.nodes[node_index].as_mut().ok_or_else(|| { + eyre::eyre!("Node at index {} has been shutdown", node_index) + })?; + let events = &mut node_handle.chain_orchestrator_rx; // Try to get the next event (non-blocking with try_next) if let Some(event) = events.next().now_or_never() { diff --git a/crates/node/src/test_utils/fixture.rs b/crates/node/src/test_utils/fixture.rs index a25735b4..d3f1c91f 100644 --- a/crates/node/src/test_utils/fixture.rs +++ b/crates/node/src/test_utils/fixture.rs @@ -21,6 +21,7 @@ use alloy_transport::layers::RetryBackoffLayer; use reth_chainspec::EthChainSpec; use reth_e2e_test_utils::{wallet::Wallet, NodeHelperType, TmpDB}; use reth_eth_wire_types::BasicNetworkPrimitives; +use reth_fs_util::remove_dir_all; use reth_network::NetworkHandle; use reth_node_builder::NodeTypes; use reth_node_types::NodeTypesWithDBAdapter; @@ -47,7 +48,11 @@ use tokio::sync::{mpsc, Mutex}; /// Main test fixture providing a high-level interface for testing rollup nodes. pub struct TestFixture { /// The list of nodes in the test setup. - pub nodes: Vec, + /// Using Option to allow nodes to be shutdown without changing indices. + /// A None value means the node at that index has been shutdown. + pub nodes: Vec>, + /// Database references for each node, used for reboot scenarios. + pub dbs: Vec>>, /// Shared wallet for generating transactions. pub wallet: Arc>, /// Chain spec used by the nodes. @@ -55,7 +60,9 @@ pub struct TestFixture { /// Optional Anvil instance for L1 simulation. pub anvil: Option, /// The task manager. Held in order to avoid dropping the node. - _tasks: TaskManager, + pub tasks: TaskManager, + /// The configuration for the nodes. + pub config: ScrollRollupNodeConfig, } impl Debug for TestFixture { @@ -70,6 +77,21 @@ impl Debug for TestFixture { } } +impl Drop for TestFixture { + fn drop(&mut self) { + // Manually cleanup test directories. + // TempDatabase's automatic drop only removes the database file itself, + // but the parent directory also contains other files (static files, blob store, etc.) + // that need to be cleaned up to avoid accumulating test artifacts. + let parent_paths: Vec<_> = + self.dbs.iter().filter_map(|db| db.path().parent().map(|p| p.to_path_buf())).collect(); + // Delete parent directories containing all database files + for path in parent_paths { + let _ = remove_dir_all(&path); + } + } +} + /// The network handle to the Scroll network. pub type ScrollNetworkHandle = NetworkHandle>; @@ -132,19 +154,28 @@ impl TestFixture { TestFixtureBuilder::new() } + /// Get a node by index, returning an error if it has been shutdown. + fn get_node(&self, node_index: usize) -> eyre::Result<&NodeHandle> { + self.nodes + .get(node_index) + .ok_or_else(|| eyre::eyre!("Node index {} out of bounds", node_index))? + .as_ref() + .ok_or_else(|| eyre::eyre!("Node at index {} has been shutdown", node_index)) + } + /// Get the sequencer node (assumes first node is sequencer). pub fn sequencer(&mut self) -> &mut NodeHandle { - let handle = &mut self.nodes[0]; + let handle = self.nodes[0].as_mut().expect("sequencer node has been shutdown"); assert!(handle.is_sequencer(), "expected sequencer, got follower"); handle } /// Get a follower node by index. pub fn follower(&mut self, index: usize) -> &mut NodeHandle { - if index == 0 && self.nodes[0].is_sequencer() { - return &mut self.nodes[index + 1]; + if index == 0 && self.nodes[0].as_ref().map(|n| n.is_sequencer()).unwrap_or(false) { + return self.nodes[index + 1].as_mut().expect("follower node has been shutdown"); } - &mut self.nodes[index] + self.nodes[index].as_mut().expect("follower node has been shutdown") } /// Get the wallet. @@ -178,7 +209,10 @@ impl TestFixture { node_index: usize, tx: impl Into, ) -> eyre::Result<()> { - self.nodes[node_index].node.rpc.inject_tx(tx.into()).await?; + let node = self.nodes[node_index] + .as_ref() + .ok_or_else(|| eyre::eyre!("Node at index {} has been shutdown", node_index))?; + node.node.rpc.inject_tx(tx.into()).await?; Ok(()) } @@ -186,8 +220,10 @@ impl TestFixture { pub async fn get_block(&self, node_index: usize) -> eyre::Result> { use reth_rpc_api::EthApiServer; - self.nodes[node_index] - .node + let node = self.nodes[node_index] + .as_ref() + .ok_or_else(|| eyre::eyre!("Node at index {} has been shutdown", node_index))?; + node.node .rpc .inner .eth_api() @@ -206,8 +242,8 @@ impl TestFixture { &self, node_index: usize, ) -> eyre::Result { - self.nodes[node_index] - .rollup_manager_handle + let node = self.get_node(node_index)?; + node.rollup_manager_handle .status() .await .map_err(|e| eyre::eyre!("Failed to get status: {}", e)) @@ -560,8 +596,7 @@ impl TestFixtureBuilder { } /// Build the test fixture. - pub async fn build(self) -> eyre::Result { - let mut config = self.config; + pub async fn build(mut self) -> eyre::Result { let chain_spec = self.chain_spec.unwrap_or_else(|| SCROLL_DEV.clone()); // Start Anvil if requested @@ -582,22 +617,25 @@ impl TestFixtureBuilder { .map_err(|e| eyre::eyre!("Failed to parse Anvil endpoint URL: {}", e))?; // Configure L1 provider and blob provider to use Anvil - config.l1_provider_args.url = Some(endpoint_url.clone()); - config.l1_provider_args.logs_query_block_range = 500; - config.blob_provider_args.anvil_url = Some(endpoint_url); - config.blob_provider_args.mock = false; + self.config.l1_provider_args.url = Some(endpoint_url.clone()); + self.config.l1_provider_args.logs_query_block_range = 500; + self.config.blob_provider_args.anvil_url = Some(endpoint_url); + self.config.blob_provider_args.mock = false; Some(handle) } else { None }; - let (nodes, _tasks, wallet) = setup_engine( - config.clone(), + let tasks = TaskManager::current(); + let (nodes, dbs, wallet) = setup_engine( + &tasks, + self.config.clone(), self.num_nodes, chain_spec.clone(), self.is_dev, self.no_local_transactions_propagation, + None, ) .await?; @@ -622,26 +660,28 @@ impl TestFixtureBuilder { let chain_orchestrator_rx = node.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?; - node_handles.push(NodeHandle { + node_handles.push(Some(NodeHandle { node, engine, chain_orchestrator_rx, l1_watcher_tx, rollup_manager_handle, - typ: if config.sequencer_args.sequencer_enabled && index == 0 { + typ: if self.config.sequencer_args.sequencer_enabled && index == 0 { NodeType::Sequencer } else { NodeType::Follower }, - }); + })); } Ok(TestFixture { nodes: node_handles, + dbs, wallet: Arc::new(Mutex::new(wallet)), chain_spec, - _tasks, + tasks, anvil, + config: self.config, }) } diff --git a/crates/node/src/test_utils/l1_helpers.rs b/crates/node/src/test_utils/l1_helpers.rs index d6c858cf..e438b716 100644 --- a/crates/node/src/test_utils/l1_helpers.rs +++ b/crates/node/src/test_utils/l1_helpers.rs @@ -115,7 +115,7 @@ impl<'a> L1Helper<'a> { self.fixture.nodes.iter().collect() }; - for node in nodes { + for node in nodes.into_iter().flatten() { if let Some(tx) = &node.l1_watcher_tx { tx.send(notification.clone()).await?; } @@ -221,7 +221,7 @@ impl<'a> L1MessageBuilder<'a> { self.l1_helper.fixture.nodes.iter().collect() }; - for node in nodes { + for node in nodes.into_iter().flatten() { if let Some(tx) = &node.l1_watcher_tx { tx.send(notification.clone()).await?; } diff --git a/crates/node/src/test_utils/mod.rs b/crates/node/src/test_utils/mod.rs index a492ba0c..855f78a4 100644 --- a/crates/node/src/test_utils/mod.rs +++ b/crates/node/src/test_utils/mod.rs @@ -66,6 +66,7 @@ pub mod event_utils; pub mod fixture; pub mod l1_helpers; pub mod network_helpers; +pub mod reboot; pub mod tx_helpers; // Re-export main types for convenience @@ -84,6 +85,7 @@ use crate::{ }; use alloy_primitives::Bytes; use reth_chainspec::EthChainSpec; +use reth_db::test_utils::create_test_rw_db_with_path; use reth_e2e_test_utils::{ node::NodeTestContext, transaction::TransactionTestContext, wallet::Wallet, Adapter, NodeHelperType, TmpDB, TmpNodeAddOnsHandle, TmpNodeEthApi, @@ -108,11 +110,13 @@ use tracing::{span, Level}; /// This is the legacy setup function that's used by existing tests. /// For new tests, consider using the `TestFixture` API instead. pub async fn setup_engine( + tasks: &TaskManager, mut scroll_node_config: ScrollRollupNodeConfig, num_nodes: usize, chain_spec: Arc<::ChainSpec>, is_dev: bool, no_local_transactions_propagation: bool, + reboot_info: Option<(usize, Arc>)>, ) -> eyre::Result<( Vec< NodeHelperType< @@ -120,7 +124,7 @@ pub async fn setup_engine( BlockchainProvider>, >, >, - TaskManager, + Vec>>, Wallet, )> where @@ -131,7 +135,6 @@ where TmpNodeAddOnsHandle: RpcHandleProvider, TmpNodeEthApi>, { - let tasks = TaskManager::current(); let exec = tasks.executor(); let network_config = NetworkArgs { @@ -141,13 +144,21 @@ where // Create nodes and peer them let mut nodes: Vec> = Vec::with_capacity(num_nodes); + let mut dbs: Vec>> = Vec::new(); + + // let (node_index, db_provided) = reboot_info.unwrap_or((0, None)); for idx in 0..num_nodes { - // disable sequencer nodes after the first one - if idx != 0 { + // Determine the actual node index (for reboot use provided index, otherwise use idx) + let node_index = reboot_info.as_ref().map(|(node_idx, _)| *node_idx).unwrap_or(idx); + + // Disable sequencer for all nodes except index 0 + if node_index != 0 { scroll_node_config.sequencer_args.sequencer_enabled = false; } - let node_config = NodeConfig::new(chain_spec.clone()) + + // Configure node with the test data directory + let mut node_config = NodeConfig::new(chain_spec.clone()) .with_network(network_config.clone()) .with_unused_ports() .with_rpc( @@ -159,9 +170,53 @@ where .set_dev(is_dev) .with_txpool(TxPoolArgs { no_local_transactions_propagation, ..Default::default() }); - let span = span!(Level::INFO, "node", idx); + // Check if we already have provided a database for a node (reboot scenario) + let db = if let Some((_, provided_db)) = &reboot_info { + // Reuse existing database for reboot + let db_path = provided_db.path(); + let test_data_dir = db_path.parent().expect("db path should have a parent directory"); + + // Set the datadir in node_config to reuse the same directory + node_config.datadir.datadir = + reth_node_core::dirs::MaybePlatformPath::from(test_data_dir.to_path_buf()); + + tracing::info!( + "Reusing existing database for node {} at {:?}", + node_index, + test_data_dir + ); + provided_db.clone() + } else { + // Create a unique persistent test directory for both Reth and Scroll databases + // Using process ID and node index to ensure uniqueness + let test_data_dir = std::env::temp_dir().join(format!( + "scroll-test-{}-node-{}-{}", + std::process::id(), + node_index, + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + )); + std::fs::create_dir_all(&test_data_dir).expect("failed to create test data directory"); + + // Set the datadir in node_config (critical for proper initialization) + node_config.datadir.datadir = + reth_node_core::dirs::MaybePlatformPath::from(test_data_dir.clone()); + + // Create Reth database in the test directory's db subdirectory + let new_db = create_test_rw_db_with_path(node_config.datadir().db()); + + tracing::info!("Created new database for node {} at {:?}", node_index, test_data_dir); + dbs.push(new_db.clone()); + new_db + }; + + let span = span!(Level::INFO, "node", node_index); let _enter = span.enter(); - let testing_node = NodeBuilder::new(node_config.clone()).testing_node(exec.clone()); + let testing_node = NodeBuilder::new(node_config.clone()) + .with_database(db.clone()) + .with_launch_context(exec.clone()); let testing_config = testing_node.config().clone(); let node = ScrollRollupNode::new(scroll_node_config.clone(), testing_config).await; let RethNodeHandle { node, node_exit_future: _ } = testing_node @@ -186,8 +241,11 @@ where NodeTestContext::new(node, |_| panic!("should not build payloads using this method")) .await?; - let genesis = node.block_hash(0); - node.update_forkchoice(genesis, genesis).await?; + // skip the forkchoice update when a database is provided (reboot scenario) + if reboot_info.is_none() { + let genesis = node.block_hash(0); + node.update_forkchoice(genesis, genesis).await?; + } // Connect each node in a chain. if let Some(previous_node) = nodes.last_mut() { @@ -202,9 +260,10 @@ where } nodes.push(node); + // Note: db is already added to dbs in the creation logic above } - Ok((nodes, tasks, Wallet::default().with_chain_id(chain_spec.chain().into()))) + Ok((nodes, dbs, Wallet::default().with_chain_id(chain_spec.chain().into()))) } /// Generate a transfer transaction with the given wallet. diff --git a/crates/node/src/test_utils/network_helpers.rs b/crates/node/src/test_utils/network_helpers.rs index 6f9d3876..25dcbc62 100644 --- a/crates/node/src/test_utils/network_helpers.rs +++ b/crates/node/src/test_utils/network_helpers.rs @@ -83,8 +83,10 @@ impl<'a> NetworkHelper<'a> { pub async fn network_handle( &self, ) -> eyre::Result> { - self.fixture.nodes[self.node_index] - .rollup_manager_handle + let node = self.fixture.nodes[self.node_index] + .as_ref() + .ok_or_else(|| eyre::eyre!("Node at index {} has been shutdown", self.node_index))?; + node.rollup_manager_handle .get_network_handle() .await .map_err(|e| eyre::eyre!("Failed to get network handle: {}", e)) diff --git a/crates/node/src/test_utils/reboot.rs b/crates/node/src/test_utils/reboot.rs new file mode 100644 index 00000000..5924ed50 --- /dev/null +++ b/crates/node/src/test_utils/reboot.rs @@ -0,0 +1,267 @@ +//! Reboot utilities for the rollup node. +//! +//! This module provides functionality to safely shutdown and restart nodes in integration tests, +//! enabling testing of: +//! - State persistence across reboots +//! - Correct restoration of `ForkchoiceState` (FCS) from database +//! - Continued L1 event processing after restart +//! - Proper handling of L1 reorgs after node restarts +//! +//! # Key Features +//! +//! ## Explicit Shutdown +//! The `shutdown_node()` method performs a graceful shutdown by: +//! 1. Sending an explicit shutdown command to the `ChainOrchestrator` +//! 2. Dropping the `NodeHandle` to release all resources +//! 3. Waiting for async cleanup to complete +//! +//! This ensures the node stops cleanly without lingering RPC errors or background tasks. +//! +//! ## State Restoration +//! The `start_node()` method restarts a previously shutdown node by: +//! 1. Reusing the existing database (state is preserved) +//! 2. Restoring the `ForkchoiceState` from the `ChainOrchestrator`'s persisted status +//! 3. Initializing the `Engine` with the restored FCS (not genesis) +//! +//! This prevents "Syncing" errors that would occur if the FCS was reset to genesis +//! while the execution client's state remained at a later block. +//! +//! # Example +//! ```ignore +//! let mut fixture = TestFixture::builder() +//! .followers(1) +//! .with_anvil(None, None, Some(22222222), None, None) +//! .build() +//! .await?; +//! +//! // Process some L1 events +//! fixture.l1().sync().await?; +//! for i in 0..=3 { +//! let tx = read_test_transaction("commitBatch", &i.to_string())?; +//! fixture.anvil_inject_tx(tx).await?; +//! } +//! +//! // Reboot the node +//! fixture.shutdown_node(0).await?; +//! fixture.start_node(0).await?; +//! +//! // Node continues processing from where it left off +//! fixture.l1().sync().await?; +//! fixture.expect_event().l1_synced().await?; +//! ``` + +use crate::test_utils::{setup_engine, NodeHandle, TestFixture}; +use scroll_alloy_provider::{ScrollAuthApiEngineClient, ScrollEngineApi}; +use scroll_engine::Engine; +use std::sync::Arc; + +impl TestFixture { + /// Shutdown a node by index, performing a graceful shutdown of all components. + /// + /// # Shutdown Process + /// + /// 1. **Explicit `ChainOrchestrator` Shutdown** + /// - Sends a `Shutdown` command to the `ChainOrchestrator` via its handle + /// - This makes the orchestrator exit its event loop immediately + /// - Prevents lingering RPC errors (e.g., 502 errors) after shutdown + /// + /// 2. **Drop `NodeHandle`** + /// - Takes ownership of the `NodeHandle` and drops it + /// - Triggers cleanup of: + /// - Network connections + /// - RPC server (stops listening on port) + /// - Database connections (flushes pending writes) + /// - Background tasks (via `TaskManager` shutdown signals) + /// - L1 watcher channels + /// + /// 3. **Wait for Cleanup** + /// - Sleeps for 1 second to allow async cleanup to complete + /// - Ensures database WAL is checkpointed (in WAL mode) + /// - Allows all background tasks to finish gracefully + /// + /// The node can later be restarted using `start_node()`, which will reuse + /// the existing database and restore state from it. + /// + /// # Arguments + /// * `node_index` - Index of the node to shutdown (0 = sequencer if enabled) + /// + /// # Errors + /// Returns an error if: + /// - `node_index` is out of bounds + /// - The node at `node_index` is already shutdown + /// + /// # Example + /// ```ignore + /// let mut fixture = TestFixture::builder().followers(2).build().await?; + /// + /// // Shutdown follower node at index 1 + /// fixture.shutdown_node(1).await?; + /// + /// // Node slot is now None, but indices are preserved + /// assert!(fixture.nodes[1].is_none()); + /// + /// // Later, restart the node + /// fixture.start_node(1).await?; + /// ``` + pub async fn shutdown_node(&mut self, node_index: usize) -> eyre::Result<()> { + if node_index >= self.nodes.len() { + return Err(eyre::eyre!( + "Node index {} out of bounds (total nodes: {})", + node_index, + self.nodes.len() + )); + } + + if self.nodes[node_index].is_none() { + return Err(eyre::eyre!("Node at index {} is already shutdown", node_index)); + } + + tracing::info!("Shutting down node at index {}", node_index); + + // Step 1: Explicitly shutdown the ChainOrchestrator + // This sends a Shutdown command that will make the ChainOrchestrator exit its event loop + // immediately + if let Some(node) = &self.nodes[node_index] { + tracing::info!("Sending shutdown command to ChainOrchestrator..."); + if let Err(e) = node.rollup_manager_handle.shutdown().await { + tracing::warn!("Failed to send shutdown command to ChainOrchestrator: {}", e); + } else { + tracing::info!("ChainOrchestrator shutdown command acknowledged"); + } + } + + // Step 2: Take ownership and drop the node handle + // This closes all channels (RPC, network, DB) and releases resources + let node_handle = self.nodes[node_index].take(); + drop(node_handle); + + // Step 3: Wait for async cleanup to complete + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + tracing::info!("Node at index {} shutdown complete", node_index); + Ok(()) + } + + /// Restart a previously shutdown node at the given index. + /// + /// This method restarts a node that was shut down, reusing its existing database + /// and restoring its state (including `ForkchoiceState`) from persisted data. + /// + /// # State Restoration + /// + /// The restarted node will: + /// 1. Reuse the same database (no data loss) + /// 2. Restore the `ForkchoiceState` from `ChainOrchestrator`'s persisted status + /// 3. Initialize the `Engine` with the restored FCS (not genesis) + /// 4. Continue processing L1 events from where it left off + /// + /// This ensures the node's view of the chain is consistent with the execution + /// client's actual state, preventing "Syncing" errors when building payloads. + /// + /// # Arguments + /// * `node_index` - Index of the node to start (0 = sequencer if enabled) + /// + /// # Errors + /// Returns an error if: + /// - `node_index` is out of bounds + /// - The node at `node_index` is already running + /// + /// # Example + /// ```ignore + /// let mut fixture = TestFixture::builder().followers(1).build().await?; + /// + /// // Shutdown the follower + /// fixture.shutdown_node(0).await?; + /// + /// // Restart the follower (reuses database and restores state) + /// fixture.start_node(0).await?; + /// + /// // Node continues from where it left off + /// fixture.l1().sync().await?; + /// fixture.expect_event().l1_synced().await?; + /// ``` + pub async fn start_node(&mut self, node_index: usize) -> eyre::Result<()> { + if node_index >= self.nodes.len() { + return Err(eyre::eyre!( + "Node index {} out of bounds (total nodes: {})", + node_index, + self.nodes.len() + )); + } + + if self.nodes[node_index].is_some() { + return Err(eyre::eyre!("Node at index {} is already running", node_index)); + } + + tracing::info!("Starting node at index {} (reusing database)", node_index); + + // Step 2: Create a new node instance with the existing database + let (mut new_nodes, _, _) = setup_engine( + &self.tasks, + self.config.clone(), + 1, + self.chain_spec.clone(), + true, + false, + Some((node_index, self.dbs[node_index].clone())), + ) + .await?; + + if new_nodes.is_empty() { + return Err(eyre::eyre!("Failed to create new node")); + } + + let new_node = new_nodes.remove(0); + + // Step 3: Setup Engine API client + let auth_client = new_node.inner.engine_http_client(); + let engine_client = Arc::new(ScrollAuthApiEngineClient::new(auth_client)) + as Arc; + + // Step 4: Get necessary handles from the new node + let l1_watcher_tx = new_node.inner.add_ons_handle.l1_watcher_tx.clone(); + let rollup_manager_handle = new_node.inner.add_ons_handle.rollup_manager_handle.clone(); + + // Step 5: Restore ForkchoiceState from persisted ChainOrchestrator status + // CRITICAL: This must NOT be reset to genesis. The execution client's state + // is at a later block, and resetting FCS to genesis would cause a mismatch, + // resulting in "Syncing" errors when building payloads after reboot. + let status = rollup_manager_handle.status().await?; + let fcs: scroll_engine::ForkchoiceState = status.l2.fcs; + + tracing::info!( + "Restored FCS from database - head: {:?}, safe: {:?}, finalized: {:?}", + fcs.head_block_info(), + fcs.safe_block_info(), + fcs.finalized_block_info() + ); + + // Step 6: Initialize Engine with the restored ForkchoiceState + let engine = Engine::new(Arc::new(engine_client), fcs); + let chain_orchestrator_rx = + new_node.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?; + + // Step 7: Determine node type (sequencer vs follower) + let was_sequencer = self.config.sequencer_args.sequencer_enabled && node_index == 0; + + // Step 8: Create the new NodeHandle with all components + let new_node_handle = NodeHandle { + node: new_node, + engine, + chain_orchestrator_rx, + l1_watcher_tx, + rollup_manager_handle, + typ: if was_sequencer { + crate::test_utils::fixture::NodeType::Sequencer + } else { + crate::test_utils::fixture::NodeType::Follower + }, + }; + + // Step 9: Replace the old (None) node slot with the new node + self.nodes[node_index] = Some(new_node_handle); + tracing::info!("Node started successfully at index {}", node_index); + + Ok(()) + } +} diff --git a/crates/node/src/test_utils/tx_helpers.rs b/crates/node/src/test_utils/tx_helpers.rs index e94bf955..0a85afc2 100644 --- a/crates/node/src/test_utils/tx_helpers.rs +++ b/crates/node/src/test_utils/tx_helpers.rs @@ -71,7 +71,11 @@ impl<'a> TransferTxBuilder<'a> { drop(wallet); // Inject into the target node - let node = &self.tx_helper.fixture.nodes[self.tx_helper.target_node_index]; + let node = self.tx_helper.fixture.nodes[self.tx_helper.target_node_index] + .as_ref() + .ok_or_else(|| { + eyre::eyre!("Node at index {} has been shutdown", self.tx_helper.target_node_index) + })?; let tx_hash = node.node.rpc.inject_tx(raw_tx).await?; Ok(tx_hash) diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index ba5c296b..7fdf262a 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -13,7 +13,7 @@ use reth_scroll_chainspec::{ScrollChainSpec, SCROLL_DEV, SCROLL_MAINNET, SCROLL_ use reth_scroll_node::ScrollNetworkPrimitives; use reth_scroll_primitives::ScrollBlock; use reth_storage_api::BlockReader; -use reth_tasks::shutdown::signal as shutdown_signal; +use reth_tasks::{shutdown::signal as shutdown_signal, TaskManager}; use reth_tokio_util::EventStream; use rollup_node::{ constants::SCROLL_GAS_LIMIT, @@ -296,13 +296,16 @@ async fn can_forward_tx_to_sequencer() -> eyre::Result<()> { // Create the chain spec for scroll mainnet with Euclid v2 activated and a test genesis. let chain_spec = (*SCROLL_DEV).clone(); - let (mut sequencer_node, _tasks, _) = - setup_engine(sequencer_node_config, 1, chain_spec.clone(), false, true).await.unwrap(); + let tasks = TaskManager::current(); + let (mut sequencer_node, _, _) = + setup_engine(&tasks, sequencer_node_config, 1, chain_spec.clone(), false, true, None) + .await + .unwrap(); let sequencer_url = format!("http://localhost:{}", sequencer_node[0].rpc_url().port().unwrap()); follower_node_config.network_args.sequencer_url = Some(sequencer_url); - let (mut follower_node, _tasks, wallet) = - setup_engine(follower_node_config, 1, chain_spec, false, true).await.unwrap(); + let (mut follower_node, _, wallet) = + setup_engine(&tasks, follower_node_config, 1, chain_spec, false, true, None).await.unwrap(); let wallet = Arc::new(Mutex::new(wallet)); @@ -459,9 +462,17 @@ async fn can_bridge_blocks() -> eyre::Result<()> { let chain_spec = (*SCROLL_DEV).clone(); // Setup the bridge node and a standard node. - let (mut nodes, tasks, _) = - setup_engine(default_test_scroll_rollup_node_config(), 1, chain_spec.clone(), false, false) - .await?; + let tasks = TaskManager::current(); + let (mut nodes, _, _) = setup_engine( + &tasks, + default_test_scroll_rollup_node_config(), + 1, + chain_spec.clone(), + false, + false, + None, + ) + .await?; let mut bridge_node = nodes.pop().unwrap(); let bridge_peer_id = bridge_node.network.record().id; let bridge_node_l1_watcher_tx = bridge_node.inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); @@ -559,9 +570,17 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() let chain_spec = (*SCROLL_MAINNET).clone(); // Launch a node - let (mut nodes, _tasks, _) = - setup_engine(default_test_scroll_rollup_node_config(), 1, chain_spec.clone(), false, false) - .await?; + let tasks = TaskManager::current(); + let (mut nodes, _, _) = setup_engine( + &tasks, + default_test_scroll_rollup_node_config(), + 1, + chain_spec.clone(), + false, + false, + None, + ) + .await?; let node = nodes.pop().unwrap(); // Instantiate the rollup node manager. @@ -829,8 +848,9 @@ async fn graceful_shutdown_sets_fcs_to_latest_signed_block_in_db_on_start_up() - config.signer_args.private_key = Some(PrivateKeySigner::random()); // Launch a node - let (mut nodes, _tasks, _) = - setup_engine(config.clone(), 1, chain_spec.clone(), false, false).await?; + let tasks = TaskManager::current(); + let (mut nodes, _, _) = + setup_engine(&tasks, config.clone(), 1, chain_spec.clone(), false, false, None).await?; let node = nodes.pop().unwrap(); // Instantiate the rollup node manager. diff --git a/crates/node/tests/l1_sync.rs b/crates/node/tests/l1_sync.rs index fdead19c..9c46bf46 100644 --- a/crates/node/tests/l1_sync.rs +++ b/crates/node/tests/l1_sync.rs @@ -696,6 +696,7 @@ async fn test_l1_reorg_batch_revert() -> eyre::Result<()> { // Step 5: Perform L1 reorg to remove the BatchRevert event (reorg depth 2) fixture.anvil_reorg(1).await?; + fixture.anvil_mine_blocks(1).await?; fixture.expect_event().l1_reorg().await?; // Step 6: Verify safe head restored to pre-revert state @@ -710,3 +711,447 @@ async fn test_l1_reorg_batch_revert() -> eyre::Result<()> { Ok(()) } + +/// Test: Node can correctly process `BatchCommit` events after reboot. +/// +/// # Test Flow +/// 1. Setup and complete L1 sync. +/// 2. Send `BatchCommit` transactions (batches 0-3) and wait for consolidation. +/// 3. Verify safe head advanced. +/// 4. Shutdown the node. +/// 5. Send more `BatchCommit` transactions (batches 4-6) while node is down. +/// 6. Start the node and sync L1. +/// 7. Wait for batch consolidation events (batches 4-6). +/// 8. Verify safe head continues to advance correctly after reboot. +#[tokio::test] +#[cfg_attr(not(feature = "test-utils"), ignore)] +async fn test_l1_sync_commit_batch_after_reboot() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + // Step 1: Setup and complete L1 sync + let mut fixture = TestFixture::builder() + .followers(1) + .skip_l1_synced_notifications() + .with_anvil(None, None, Some(22222222), None, None) + .build() + .await?; + + fixture.l1().sync().await?; + fixture.expect_event().l1_synced().await?; + + // Step 2: Send BatchCommit transactions (batches 0-3) + for i in 0..=3 { + let commit_batch_tx = read_test_transaction("commitBatch", &i.to_string())?; + fixture.anvil_inject_tx(commit_batch_tx).await?; + } + for _ in 1..=3 { + fixture.expect_event().batch_consolidated().await?; + } + + let status_before_reboot = fixture.get_status(0).await?; + let safe_before_reboot = status_before_reboot.l2.fcs.safe_block_info().number; + tracing::info!("Safe head before reboot: {}", safe_before_reboot); + assert!(safe_before_reboot > 0, "Safe head should have advanced"); + + // Step 3: Shutdown the node + tracing::info!("Shutdowning node..."); + fixture.shutdown_node(0).await?; + + // Step 4: Send more BatchCommit transactions (batches 4-6) + for i in 4..=6 { + let commit_batch_tx = read_test_transaction("commitBatch", &i.to_string())?; + fixture.anvil_inject_tx(commit_batch_tx).await?; + } + + // Step 5: Start the node and sync + fixture.start_node(0).await?; + fixture.l1().sync().await?; + fixture.expect_event().l1_synced().await?; + + for _ in 4..=6 { + fixture.expect_event().batch_consolidated().await?; + } + + // Step 6: Verify safe head continues to advance + let status_after_reboot = fixture.get_status(0).await?; + let safe_after_reboot = status_after_reboot.l2.fcs.safe_block_info().number; + tracing::info!("Safe head after reboot: {}", safe_after_reboot); + assert!( + safe_after_reboot > safe_before_reboot, + "Safe head should continue advancing after reboot" + ); + + Ok(()) +} + +/// Test: Node can correctly process `BatchFinalized` events after reboot. +/// +/// # Test Flow +/// 1. Setup and complete L1 sync. +/// 2. Send `BatchCommit` transactions (batches 0-6) and wait for consolidation. +/// 3. Send `BatchFinalized` transactions (batches 1-3) and wait for indexing. +/// 4. Verify finalized head advanced. +/// 5. Shutdown the node. +/// 6. Send more `BatchFinalized` transactions (batches 4-6) while node is down. +/// 7. Start the node and sync L1. +/// 8. Wait for batch finalize indexed events (batches 4-6). +/// 9. Verify finalized head continues to advance correctly after reboot. +#[tokio::test] +#[cfg_attr(not(feature = "test-utils"), ignore)] +async fn test_l1_sync_finalize_batch_after_reboot() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + // Step 1: Setup and complete L1 sync + let mut fixture = TestFixture::builder() + .followers(1) + .skip_l1_synced_notifications() + .with_anvil(None, None, Some(22222222), None, None) + .build() + .await?; + + fixture.l1().sync().await?; + fixture.expect_event().l1_synced().await?; + + // Step 2: Send BatchCommit transactions (batches 0-6) + for i in 0..=6 { + let commit_batch_tx = read_test_transaction("commitBatch", &i.to_string())?; + fixture.anvil_inject_tx(commit_batch_tx).await?; + } + for _ in 1..=6 { + fixture.expect_event().batch_consolidated().await?; + } + + let status_after_commit = fixture.get_status(0).await?; + let safe_after_commit = status_after_commit.l2.fcs.safe_block_info().number; + tracing::info!("Safe head after commits: {}", safe_after_commit); + + // Step 3: Send BatchFinalized transactions (batches 1-3) + for i in 1..=3 { + let finalize_batch_tx = read_test_transaction("finalizeBatch", &i.to_string())?; + fixture.anvil_inject_tx(finalize_batch_tx).await?; + } + for _ in 1..=3 { + fixture.expect_event().batch_finalize_indexed().await?; + } + + let status_before_reboot = fixture.get_status(0).await?; + let finalized_before_reboot = status_before_reboot.l2.fcs.finalized_block_info().number; + tracing::info!("Finalized head before reboot: {}", finalized_before_reboot); + assert!(finalized_before_reboot > 0, "Finalized head should have advanced"); + + // Step 4: Reboot the node + tracing::info!("Rebooting node..."); + fixture.shutdown_node(0).await?; + + // Step 5: Send more BatchFinalized transactions (batches 4-6) + for i in 4..=6 { + let finalize_batch_tx = read_test_transaction("finalizeBatch", &i.to_string())?; + fixture.anvil_inject_tx(finalize_batch_tx).await?; + } + + // Step 6: Start the node and sync + fixture.start_node(0).await?; + fixture.l1().sync().await?; + fixture.expect_event().l1_synced().await?; + + for _ in 4..=6 { + fixture.expect_event().batch_finalize_indexed().await?; + } + + // Step 6: Verify finalized head continues to advance + let status_after_reboot = fixture.get_status(0).await?; + let finalized_after_reboot = status_after_reboot.l2.fcs.finalized_block_info().number; + tracing::info!("Finalized head after reboot: {}", finalized_after_reboot); + assert!( + finalized_after_reboot > finalized_before_reboot, + "Finalized head should continue advancing after reboot" + ); + + Ok(()) +} + +/// Test: Node can correctly process `BatchRevert` events after reboot. +/// +/// # Test Flow +/// 1. Setup and complete L1 sync. +/// 2. Send `BatchCommit` transactions (batches 0-6) and wait for consolidation. +/// 3. Record safe head. +/// 4. Shutdown the node. +/// 5. Send `BatchRevert` transaction while node is down. +/// 6. Start the node and sync L1. +/// 7. Wait for batch reverted event. +/// 8. Verify safe head decreased correctly. +#[tokio::test] +#[cfg_attr(not(feature = "test-utils"), ignore)] +async fn test_l1_sync_revert_batch_after_reboot() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + // Step 1: Setup and complete L1 sync + let mut fixture = TestFixture::builder() + .followers(1) + .skip_l1_synced_notifications() + .with_anvil(None, None, Some(22222222), None, None) + .build() + .await?; + + fixture.l1().sync().await?; + fixture.expect_event().l1_synced().await?; + + // Step 2: Send BatchCommit transactions (batches 0-6) + for i in 0..=6 { + let commit_batch_tx = read_test_transaction("commitBatch", &i.to_string())?; + fixture.anvil_inject_tx(commit_batch_tx).await?; + } + for _ in 1..=6 { + fixture.expect_event().batch_consolidated().await?; + } + + let status_before_reboot = fixture.get_status(0).await?; + let safe_before_reboot = status_before_reboot.l2.fcs.safe_block_info().number; + tracing::info!("Safe head before reboot: {}", safe_before_reboot); + + // Step 3: Shutdown the node + tracing::info!("Rebooting node..."); + fixture.shutdown_node(0).await?; + + // Step 4: Send BatchRevert transaction + let revert_batch_tx = read_test_transaction("revertBatch", "0")?; + fixture.anvil_inject_tx(revert_batch_tx).await?; + + // Step 5: Start the node and sync + fixture.start_node(0).await?; + fixture.l1().sync().await?; + fixture.expect_event().l1_synced().await?; + + fixture.expect_event().batch_reverted().await?; + + // Step 5: Verify safe head decreased + let status_after_revert = fixture.get_status(0).await?; + let safe_after_revert = status_after_revert.l2.fcs.safe_block_info().number; + tracing::info!("Safe head after revert: {}", safe_after_revert); + assert!( + safe_after_revert < safe_before_reboot, + "Safe head should decrease after BatchRevert following reboot" + ); + + Ok(()) +} + +/// Test: Node can correctly process `BatchCommit` events after reboot + L1 reorg. +/// +/// # Test Flow +/// 1. Setup and complete L1 sync. +/// 2. Send `BatchCommit` transactions (batches 0-3) and wait for consolidation. +/// 3. Record safe head. +/// 4. Shutdown the node. +/// 5. Trigger L1 reorg (depth 1) to remove some `BatchCommit` events. +/// 6. Start the node and sync L1. +/// 7. Wait for L1 synced event. +/// 8. Verify safe head rolled back correctly after L1 reorg. +#[tokio::test] +#[cfg_attr(not(feature = "test-utils"), ignore)] +async fn test_l1_reorg_commit_batch_after_reboot() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + // Step 1: Setup and complete L1 sync + let mut fixture = TestFixture::builder() + .followers(1) + .skip_l1_synced_notifications() + .with_anvil(None, None, Some(22222222), None, None) + .build() + .await?; + + fixture.l1().sync().await?; + fixture.expect_event().l1_synced().await?; + + // Step 2: Send BatchCommit transactions (batches 0-3) + for i in 0..=3 { + let commit_batch_tx = read_test_transaction("commitBatch", &i.to_string())?; + fixture.anvil_inject_tx(commit_batch_tx).await?; + } + for _ in 1..=3 { + fixture.expect_event().batch_consolidated().await?; + } + + let status_before_reorg = fixture.get_status(0).await?; + let safe_before_reorg = status_before_reorg.l2.fcs.safe_block_info().number; + assert!(safe_before_reorg > 0, "Safe head should have advanced"); + tracing::info!("Safe head before reboot: {}", safe_before_reorg); + + // Step 3: Shutdown the node + tracing::info!("Rebooting node..."); + fixture.shutdown_node(0).await?; + + // Step 4: Trigger L1 reorg (removes some BatchCommit events) + fixture.anvil_reorg(1).await?; + fixture.anvil_mine_blocks(1).await?; + + // Step 5: Start the node and sync + fixture.start_node(0).await?; + fixture.l1().sync().await?; + fixture.expect_event().l1_synced().await?; + + let status_after_reorg = fixture.get_status(0).await?; + let safe_after_reorg = status_after_reorg.l2.fcs.safe_block_info().number; + tracing::info!("Safe head after reorg: {}", safe_after_reorg); + + // Step 6: Verify safe head advanced after new commits + let status_final = fixture.get_status(0).await?; + let safe_final = status_final.l2.fcs.safe_block_info().number; + tracing::info!("Safe head after new commits: {}", safe_final); + assert!(safe_before_reorg > safe_after_reorg, "Safe head should rollback after L1 reorg"); + + Ok(()) +} + +/// Test: Node can correctly handle L1 reorg of `BatchFinalized` events after reboot. +/// +/// # Test Flow +/// 1. Setup and complete L1 sync. +/// 2. Send `BatchCommit` transactions (batches 0-6) and wait for consolidation. +/// 3. Send `BatchFinalized` transactions (batches 1-3) and wait for indexing. +/// 4. Record finalized head. +/// 5. Shutdown the node. +/// 6. Trigger L1 reorg (depth 2) to remove `BatchFinalized` events. +/// 7. Expect L1 reorg event. +/// 8. Start the node and sync L1. +/// 9. Verify finalized head remains the same (reorg removed unfinalized L1 blocks). +#[tokio::test] +#[cfg_attr(not(feature = "test-utils"), ignore)] +async fn test_l1_reorg_finalize_batch_after_reboot() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + // Step 1: Setup and complete L1 sync + let mut fixture = TestFixture::builder() + .followers(1) + .skip_l1_synced_notifications() + .with_anvil(None, None, Some(22222222), None, Some(32)) + .build() + .await?; + + fixture.l1().sync().await?; + fixture.expect_event().l1_synced().await?; + + // Step 2: Send BatchCommit transactions (batches 0-6) + for i in 0..=6 { + let commit_batch_tx = read_test_transaction("commitBatch", &i.to_string())?; + fixture.anvil_inject_tx(commit_batch_tx).await?; + } + for _ in 1..=6 { + fixture.expect_event().batch_consolidated().await?; + } + + // Step 3: Send BatchFinalized transactions (batches 1-3) + for i in 1..=3 { + let finalize_batch_tx = read_test_transaction("finalizeBatch", &i.to_string())?; + fixture.anvil_inject_tx(finalize_batch_tx).await?; + } + for _ in 1..=3 { + fixture.expect_event().batch_finalize_indexed().await?; + } + + let status_before_reboot = fixture.get_status(0).await?; + let finalized_before_reboot = status_before_reboot.l2.fcs.finalized_block_info().number; + tracing::info!("Finalized head before reboot: {}", finalized_before_reboot); + assert_eq!(finalized_before_reboot, 0, "Finalized head should be 0"); + + // Step 4: Shutdown the node + tracing::info!("Rebooting node..."); + fixture.shutdown_node(0).await?; + + // Step 5: Trigger L1 reorg (removes BatchFinalized events) + fixture.anvil_reorg(2).await?; + fixture.anvil_mine_blocks(2).await?; + + // Step 6: Start the node and sync + fixture.start_node(0).await?; + fixture.l1().sync().await?; + fixture.expect_event().l1_synced().await?; + + // Step 6: Verify finalized head rolls back + let status_after_reorg = fixture.get_status(0).await?; + let finalized_after_reorg = status_after_reorg.l2.fcs.finalized_block_info().number; + tracing::info!("Finalized head after reorg: {}", finalized_after_reorg); + assert_eq!( + finalized_after_reorg, finalized_before_reboot, + "Finalized head should be the same after L1 reorg" + ); + + Ok(()) +} + +/// Test: Node can correctly handle L1 reorg of `BatchRevert` events after reboot. +/// +/// # Test Flow +/// 1. Setup and complete L1 sync. +/// 2. Send `BatchCommit` transactions (batches 0-6) and wait for consolidation. +/// 3. Record safe head after commits. +/// 4. Send `BatchRevert` transaction and wait for batch reverted event. +/// 5. Verify safe head decreased after revert. +/// 6. Shutdown the node. +/// 7. Trigger L1 reorg (depth 1) to remove the `BatchRevert` event. +/// 8. Start the node and sync L1. +/// 9. Verify safe head is restored to pre-revert state (reorg undid the revert). +#[tokio::test] +#[cfg_attr(not(feature = "test-utils"), ignore)] +async fn test_l1_reorg_revert_batch_after_reboot() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + // Step 1: Setup and complete L1 sync + let mut fixture = TestFixture::builder() + .followers(1) + .skip_l1_synced_notifications() + .with_anvil(None, None, Some(22222222), None, None) + .build() + .await?; + + fixture.l1().sync().await?; + fixture.expect_event().l1_synced().await?; + + // Step 2: Send BatchCommit transactions (batches 0-6) + for i in 0..=6 { + let commit_batch_tx = read_test_transaction("commitBatch", &i.to_string())?; + fixture.anvil_inject_tx(commit_batch_tx).await?; + } + for _ in 1..=6 { + fixture.expect_event().batch_consolidated().await?; + } + + let status_after_commits = fixture.get_status(0).await?; + let safe_after_commits = status_after_commits.l2.fcs.safe_block_info().number; + tracing::info!("Safe head after commits: {}", safe_after_commits); + + // Step 3: Send BatchRevert transaction + let revert_batch_tx = read_test_transaction("revertBatch", "0")?; + fixture.anvil_inject_tx(revert_batch_tx).await?; + fixture.expect_event().batch_reverted().await?; + + let status_after_revert = fixture.get_status(0).await?; + let safe_after_revert = status_after_revert.l2.fcs.safe_block_info().number; + tracing::info!("Safe head after revert: {}", safe_after_revert); + assert!(safe_after_revert < safe_after_commits, "Safe head should decrease after BatchRevert"); + + // Step 4: Shutdown the node + tracing::info!("Rebooting node..."); + fixture.shutdown_node(0).await?; + + // Step 5: Trigger L1 reorg (removes BatchRevert event) + fixture.anvil_reorg(1).await?; + fixture.anvil_mine_blocks(1).await?; + + // Step 6: Start the node and sync + fixture.start_node(0).await?; + fixture.l1().sync().await?; + fixture.expect_event().l1_synced().await?; + + // Step 7: Verify safe head is restored to pre-revert state + let status_after_reorg = fixture.get_status(0).await?; + let safe_after_reorg = status_after_reorg.l2.fcs.safe_block_info().number; + tracing::info!("Safe head after reorg: {}", safe_after_reorg); + assert_eq!( + safe_after_reorg, safe_after_commits, + "Safe head should be restored after L1 reorg removes BatchRevert event" + ); + + Ok(()) +} diff --git a/crates/node/tests/sync.rs b/crates/node/tests/sync.rs index e37a61de..05438fff 100644 --- a/crates/node/tests/sync.rs +++ b/crates/node/tests/sync.rs @@ -6,6 +6,7 @@ use reqwest::Url; use reth_provider::{BlockIdReader, BlockReader}; use reth_rpc_eth_api::helpers::EthTransactions; use reth_scroll_chainspec::{SCROLL_DEV, SCROLL_SEPOLIA}; +use reth_tasks::TaskManager; use reth_tokio_util::EventStream; use rollup_node::{ test_utils::{ @@ -79,8 +80,9 @@ async fn test_should_consolidate_to_block_15k() -> eyre::Result<()> { }; let chain_spec = (*SCROLL_SEPOLIA).clone(); - let (mut nodes, _tasks, _) = - setup_engine(node_config, 1, chain_spec.clone(), false, false).await?; + let tasks = TaskManager::current(); + let (mut nodes, _, _) = + setup_engine(&tasks, node_config, 1, chain_spec.clone(), false, false, None).await?; let node = nodes.pop().unwrap(); // We perform consolidation up to block 15k. This allows us to capture a batch revert event at @@ -550,17 +552,27 @@ async fn test_chain_orchestrator_l1_reorg() -> eyre::Result<()> { let chain_spec = (*SCROLL_DEV).clone(); // Create a sequencer node and an unsynced node. - let (mut nodes, _tasks, _) = - setup_engine(sequencer_node_config.clone(), 1, chain_spec.clone(), false, false) - .await - .unwrap(); + let tasks = TaskManager::current(); + let (mut nodes, _, _) = setup_engine( + &tasks, + sequencer_node_config.clone(), + 1, + chain_spec.clone(), + false, + false, + None, + ) + .await + .unwrap(); let mut sequencer = nodes.pop().unwrap(); let sequencer_handle = sequencer.inner.rollup_manager_handle.clone(); let mut sequencer_events = sequencer_handle.get_event_listener().await?; let sequencer_l1_watcher_tx = sequencer.inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); - let (mut nodes, _tasks, _) = - setup_engine(node_config.clone(), 1, chain_spec.clone(), false, false).await.unwrap(); + let (mut nodes, _, _) = + setup_engine(&tasks, node_config.clone(), 1, chain_spec.clone(), false, false, None) + .await + .unwrap(); let mut follower = nodes.pop().unwrap(); let mut follower_events = follower.inner.rollup_manager_handle.get_event_listener().await?; let follower_l1_watcher_tx = follower.inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); diff --git a/crates/sequencer/Cargo.toml b/crates/sequencer/Cargo.toml index 7a838b04..219dc75f 100644 --- a/crates/sequencer/Cargo.toml +++ b/crates/sequencer/Cargo.toml @@ -53,6 +53,7 @@ scroll-alloy-consensus.workspace = true # reth reth-e2e-test-utils.workspace = true reth-node-core.workspace = true +reth-tasks.workspace = true reth-tracing.workspace = true # reth-scroll diff --git a/crates/sequencer/tests/e2e.rs b/crates/sequencer/tests/e2e.rs index 85f2d7d8..3d66ef8b 100644 --- a/crates/sequencer/tests/e2e.rs +++ b/crates/sequencer/tests/e2e.rs @@ -6,6 +6,7 @@ use futures::stream::StreamExt; use reth_e2e_test_utils::transaction::TransactionTestContext; use reth_scroll_chainspec::SCROLL_DEV; use reth_scroll_node::test_utils::setup; +use reth_tasks::TaskManager; use rollup_node::{ constants::SCROLL_GAS_LIMIT, test_utils::{default_test_scroll_rollup_node_config, setup_engine}, @@ -211,10 +212,18 @@ async fn can_build_blocks_with_delayed_l1_messages() { const L1_MESSAGE_DELAY: u64 = 2; // setup a test node - let (mut nodes, _tasks, wallet) = - setup_engine(default_test_scroll_rollup_node_config(), 1, chain_spec, false, false) - .await - .unwrap(); + let tasks = TaskManager::current(); + let (mut nodes, _, wallet) = setup_engine( + &tasks, + default_test_scroll_rollup_node_config(), + 1, + chain_spec, + false, + false, + None, + ) + .await + .unwrap(); let node = nodes.pop().unwrap(); let wallet = Arc::new(Mutex::new(wallet)); @@ -336,10 +345,17 @@ async fn can_build_blocks_with_finalized_l1_messages() { let chain_spec = SCROLL_DEV.clone(); // setup a test node - let (mut nodes, _tasks, wallet) = - setup_engine(default_test_scroll_rollup_node_config(), 1, chain_spec, false, false) - .await - .unwrap(); + let (mut nodes, _, wallet) = setup_engine( + &TaskManager::current(), + default_test_scroll_rollup_node_config(), + 1, + chain_spec, + false, + false, + None, + ) + .await + .unwrap(); let node = nodes.pop().unwrap(); let wallet = Arc::new(Mutex::new(wallet)); @@ -513,8 +529,9 @@ async fn can_sequence_blocks_with_private_key_file() -> eyre::Result<()> { rpc_args: RpcArgs::default(), }; - let (nodes, _tasks, wallet) = - setup_engine(rollup_manager_args, 1, chain_spec, false, false).await?; + let tasks = TaskManager::current(); + let (nodes, _, wallet) = + setup_engine(&tasks, rollup_manager_args, 1, chain_spec, false, false, None).await?; let wallet = Arc::new(Mutex::new(wallet)); let sequencer_rnm_handle = nodes[0].inner.add_ons_handle.rollup_manager_handle.clone(); @@ -616,8 +633,9 @@ async fn can_sequence_blocks_with_hex_key_file_without_prefix() -> eyre::Result< rpc_args: RpcArgs::default(), }; - let (nodes, _tasks, wallet) = - setup_engine(rollup_manager_args, 1, chain_spec, false, false).await?; + let tasks = TaskManager::current(); + let (nodes, _, wallet) = + setup_engine(&tasks, rollup_manager_args, 1, chain_spec, false, false, None).await?; let wallet = Arc::new(Mutex::new(wallet)); let sequencer_rnm_handle = nodes[0].inner.add_ons_handle.rollup_manager_handle.clone(); @@ -677,7 +695,9 @@ async fn can_build_blocks_and_exit_at_gas_limit() { // setup a test node. use a high value for the payload building duration to be sure we don't // exit early. - let (mut nodes, _tasks, wallet) = setup_engine( + let tasks = TaskManager::current(); + let (mut nodes, _, wallet) = setup_engine( + &tasks, ScrollRollupNodeConfig { sequencer_args: SequencerArgs { payload_building_duration: 1000, ..Default::default() }, ..default_test_scroll_rollup_node_config() @@ -686,6 +706,7 @@ async fn can_build_blocks_and_exit_at_gas_limit() { chain_spec, false, false, + None, ) .await .unwrap(); @@ -763,7 +784,9 @@ async fn can_build_blocks_and_exit_at_time_limit() { // setup a test node. use a low payload building duration in order to exit before we reach the // gas limit. - let (mut nodes, _tasks, wallet) = setup_engine( + let tasks = TaskManager::current(); + let (mut nodes, _, wallet) = setup_engine( + &tasks, ScrollRollupNodeConfig { sequencer_args: SequencerArgs { payload_building_duration: 10, ..Default::default() }, ..default_test_scroll_rollup_node_config() @@ -772,6 +795,7 @@ async fn can_build_blocks_and_exit_at_time_limit() { chain_spec, false, false, + None, ) .await .unwrap(); @@ -851,10 +875,18 @@ async fn should_limit_l1_message_cumulative_gas() { // setup a test node let chain_spec = SCROLL_DEV.clone(); - let (mut nodes, _tasks, wallet) = - setup_engine(default_test_scroll_rollup_node_config(), 1, chain_spec, false, false) - .await - .unwrap(); + let tasks = TaskManager::current(); + let (mut nodes, _, wallet) = setup_engine( + &tasks, + default_test_scroll_rollup_node_config(), + 1, + chain_spec, + false, + false, + None, + ) + .await + .unwrap(); let node = nodes.pop().unwrap(); let wallet = Arc::new(Mutex::new(wallet)); @@ -968,10 +1000,18 @@ async fn should_not_add_skipped_messages() { // setup a test node let chain_spec = SCROLL_DEV.clone(); - let (mut nodes, _tasks, wallet) = - setup_engine(default_test_scroll_rollup_node_config(), 1, chain_spec, false, false) - .await - .unwrap(); + let tasks = TaskManager::current(); + let (mut nodes, _, wallet) = setup_engine( + &tasks, + default_test_scroll_rollup_node_config(), + 1, + chain_spec, + false, + false, + None, + ) + .await + .unwrap(); let node = nodes.pop().unwrap(); let wallet = Arc::new(Mutex::new(wallet)); diff --git a/crates/watcher/Cargo.toml b/crates/watcher/Cargo.toml index 4c442cd9..8cd20188 100644 --- a/crates/watcher/Cargo.toml +++ b/crates/watcher/Cargo.toml @@ -27,6 +27,9 @@ rollup-node-primitives.workspace = true rollup-node-providers.workspace = true scroll-l1.workspace = true +# reth +reth-tasks.workspace = true + # scroll scroll-alloy-consensus.workspace = true