Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,11 @@ scroll-alloy-rpc-types-engine = { git = "https://github.com/scroll-tech/reth.git

# reth
reth-chainspec = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3", default-features = false }
reth-db = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3", default-features = false }
reth-e2e-test-utils = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3" }
reth-eth-wire = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3", default-features = false }
reth-eth-wire-types = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3", default-features = false }
reth-fs-util = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3", default-features = false }
reth-network = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3", default-features = false }
reth-network-api = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3", default-features = false }
reth-network-p2p = { git = "https://github.com/scroll-tech/reth.git", tag = "scroll-v91.3", default-features = false }
Expand Down
4 changes: 4 additions & 0 deletions crates/chain-orchestrator/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ pub enum ChainOrchestratorError {
/// An error occurred while handling rollup node primitives.
#[error("An error occurred while handling rollup node primitives: {0}")]
RollupNodePrimitiveError(rollup_node_primitives::RollupNodePrimitiveError),
/// Shutdown requested - this is not a real error but used to signal graceful shutdown.
#[cfg(feature = "test-utils")]
#[error("Shutdown requested")]
Shutdown,
}

impl CanRetry for ChainOrchestratorError {
Expand Down
3 changes: 3 additions & 0 deletions crates/chain-orchestrator/src/handle/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub enum ChainOrchestratorCommand<N: FullNetwork<Primitives = ScrollNetworkPrimi
/// Returns a database handle for direct database access.
#[cfg(feature = "test-utils")]
DatabaseHandle(oneshot::Sender<std::sync::Arc<scroll_db::Database>>),
/// Request the `ChainOrchestrator` to shutdown immediately.
#[cfg(feature = "test-utils")]
Shutdown(oneshot::Sender<()>),
}

/// The database queries that can be sent to the rollup manager.
Expand Down
9 changes: 9 additions & 0 deletions crates/chain-orchestrator/src/handle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,13 @@ impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> ChainOrchestratorHand
self.send_command(ChainOrchestratorCommand::DatabaseHandle(tx));
rx.await
}

/// Sends a command to shutdown the `ChainOrchestrator` immediately.
/// This will cause the `ChainOrchestrator`'s event loop to exit gracefully.
#[cfg(feature = "test-utils")]
pub async fn shutdown(&self) -> Result<(), oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
self.send_command(ChainOrchestratorCommand::Shutdown(tx));
rx.await
}
}
19 changes: 17 additions & 2 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,16 @@ impl<
break;
}
Some(command) = self.handle_rx.recv() => {
if let Err(err) = self.handle_command(command).await {
tracing::error!(target: "scroll::chain_orchestrator", ?err, "Error handling command");
match self.handle_command(command).await {
#[cfg(feature = "test-utils")]
Err(ChainOrchestratorError::Shutdown) => {
tracing::info!(target: "scroll::chain_orchestrator", "Shutdown requested, exiting gracefully");
break;
}
Err(err) => {
tracing::error!(target: "scroll::chain_orchestrator", ?err, "Error handling command");
}
Ok(_) => {}
}
}
Some(event) = async {
Expand Down Expand Up @@ -410,6 +418,13 @@ impl<
ChainOrchestratorCommand::DatabaseHandle(tx) => {
let _ = tx.send(self.database.clone());
}
#[cfg(feature = "test-utils")]
ChainOrchestratorCommand::Shutdown(tx) => {
tracing::info!(target: "scroll::chain_orchestrator", "Received shutdown command, exiting event loop");
let _ = tx.send(());
// Return an error to signal shutdown
return Err(ChainOrchestratorError::Shutdown);
}
}

Ok(())
Expand Down
5 changes: 5 additions & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ anvil = { git = "https://github.com/foundry-rs/foundry.git", rev = "2c84e1c970d1
alloy-rpc-types-eth = { workspace = true, optional = true }
alloy-rpc-types-engine = { workspace = true, optional = true }
alloy-rpc-types-anvil = { workspace = true, optional = true }
reth-db = { workspace = true, optional = true, features = ["test-utils"] }
reth-e2e-test-utils = { workspace = true, optional = true }
reth-engine-local = { workspace = true, optional = true }
reth-fs-util = { workspace = true, optional = true }
reth-provider = { workspace = true, optional = true }
reth-rpc-layer = { workspace = true, optional = true }
reth-rpc-server-types = { workspace = true, optional = true }
Expand Down Expand Up @@ -112,6 +114,7 @@ console-subscriber = "0.5.0"
alloy-chains.workspace = true
alloy-eips.workspace = true
futures.workspace = true
reth-db = { workspace = true, features = ["test-utils"] }
reth-e2e-test-utils.workspace = true
reth-node-core.workspace = true
reth-provider.workspace = true
Expand All @@ -132,7 +135,9 @@ alloy-rpc-types-eth = { workspace = true }
[features]
js-tracer = ["reth-scroll-node/js-tracer", "reth-scroll-rpc/js-tracer"]
test-utils = [
"reth-db",
"reth-engine-local",
"reth-fs-util",
"reth-trie-db/test-utils",
"reth-chainspec/test-utils",
"reth-evm/test-utils",
Expand Down
1 change: 1 addition & 0 deletions crates/node/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl ScrollRollupNodeConfig {
) -> eyre::Result<()> {
// Instantiate the database
let db_path = node_config.datadir().db();

let database_path = if let Some(database_path) = &self.database_args.rn_db_path {
database_path.to_string_lossy().to_string()
} else {
Expand Down
3 changes: 2 additions & 1 deletion crates/node/src/test_utils/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ impl<'a> BlockBuilder<'a> {

/// Build the block and validate against expectations.
pub async fn build_and_await_block(self) -> eyre::Result<ScrollBlock> {
let sequencer_node = &self.fixture.nodes[0];
let sequencer_node =
self.fixture.nodes[0].as_ref().expect("sequencer node has been shutdown");

// Get the sequencer from the rollup manager handle
let handle = &sequencer_node.rollup_manager_handle;
Expand Down
5 changes: 4 additions & 1 deletion crates/node/src/test_utils/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ impl<'a> DatabaseHelper<'a> {

/// Get the database for this node.
async fn database(&self) -> eyre::Result<Arc<Database>> {
Ok(self.fixture.nodes[self.node_index].rollup_manager_handle.get_database_handle().await?)
let node = self.fixture.nodes[self.node_index]
.as_ref()
.ok_or_else(|| eyre::eyre!("Node at index {} has been shutdown", self.node_index))?;
Ok(node.rollup_manager_handle.get_database_handle().await?)
}

/// Get the finalized block number of a batch by its index.
Expand Down
10 changes: 8 additions & 2 deletions crates/node/src/test_utils/event_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ impl<'a> EventWaiter<'a> {
) -> eyre::Result<Vec<ChainOrchestratorEvent>> {
let mut matched_events = Vec::new();
for node in self.node_indices {
let events = &mut self.fixture.nodes[node].chain_orchestrator_rx;
let Some(node_handle) = &mut self.fixture.nodes[node] else {
continue; // Skip shutdown nodes
};
let events = &mut node_handle.chain_orchestrator_rx;
let mut node_matched_events = Vec::new();

let result = timeout(self.timeout_duration, async {
Expand Down Expand Up @@ -282,7 +285,10 @@ impl<'a> EventWaiter<'a> {
continue;
}

let events = &mut self.fixture.nodes[node_index].chain_orchestrator_rx;
let node_handle = self.fixture.nodes[node_index].as_mut().ok_or_else(|| {
eyre::eyre!("Node at index {} has been shutdown", node_index)
})?;
let events = &mut node_handle.chain_orchestrator_rx;

// Try to get the next event (non-blocking with try_next)
if let Some(event) = events.next().now_or_never() {
Expand Down
86 changes: 63 additions & 23 deletions crates/node/src/test_utils/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use alloy_transport::layers::RetryBackoffLayer;
use reth_chainspec::EthChainSpec;
use reth_e2e_test_utils::{wallet::Wallet, NodeHelperType, TmpDB};
use reth_eth_wire_types::BasicNetworkPrimitives;
use reth_fs_util::remove_dir_all;
use reth_network::NetworkHandle;
use reth_node_builder::NodeTypes;
use reth_node_types::NodeTypesWithDBAdapter;
Expand All @@ -47,15 +48,21 @@ use tokio::sync::{mpsc, Mutex};
/// Main test fixture providing a high-level interface for testing rollup nodes.
pub struct TestFixture {
/// The list of nodes in the test setup.
pub nodes: Vec<NodeHandle>,
/// Using Option to allow nodes to be shutdown without changing indices.
/// A None value means the node at that index has been shutdown.
pub nodes: Vec<Option<NodeHandle>>,
/// Database references for each node, used for reboot scenarios.
pub dbs: Vec<Arc<reth_db::test_utils::TempDatabase<reth_db::DatabaseEnv>>>,
/// Shared wallet for generating transactions.
pub wallet: Arc<Mutex<Wallet>>,
/// Chain spec used by the nodes.
pub chain_spec: Arc<<ScrollRollupNode as NodeTypes>::ChainSpec>,
/// Optional Anvil instance for L1 simulation.
pub anvil: Option<anvil::NodeHandle>,
/// The task manager. Held in order to avoid dropping the node.
_tasks: TaskManager,
pub tasks: TaskManager,
/// The configuration for the nodes.
pub config: ScrollRollupNodeConfig,
}

impl Debug for TestFixture {
Expand All @@ -70,6 +77,21 @@ impl Debug for TestFixture {
}
}

impl Drop for TestFixture {
fn drop(&mut self) {
// Manually cleanup test directories.
// TempDatabase's automatic drop only removes the database file itself,
// but the parent directory also contains other files (static files, blob store, etc.)
// that need to be cleaned up to avoid accumulating test artifacts.
let parent_paths: Vec<_> =
self.dbs.iter().filter_map(|db| db.path().parent().map(|p| p.to_path_buf())).collect();
// Delete parent directories containing all database files
for path in parent_paths {
let _ = remove_dir_all(&path);
}
}
}

/// The network handle to the Scroll network.
pub type ScrollNetworkHandle =
NetworkHandle<BasicNetworkPrimitives<ScrollPrimitives, ScrollPooledTransaction>>;
Expand Down Expand Up @@ -132,19 +154,28 @@ impl TestFixture {
TestFixtureBuilder::new()
}

/// Get a node by index, returning an error if it has been shutdown.
fn get_node(&self, node_index: usize) -> eyre::Result<&NodeHandle> {
self.nodes
.get(node_index)
.ok_or_else(|| eyre::eyre!("Node index {} out of bounds", node_index))?
.as_ref()
.ok_or_else(|| eyre::eyre!("Node at index {} has been shutdown", node_index))
}

/// Get the sequencer node (assumes first node is sequencer).
pub fn sequencer(&mut self) -> &mut NodeHandle {
let handle = &mut self.nodes[0];
let handle = self.nodes[0].as_mut().expect("sequencer node has been shutdown");
assert!(handle.is_sequencer(), "expected sequencer, got follower");
handle
}

/// Get a follower node by index.
pub fn follower(&mut self, index: usize) -> &mut NodeHandle {
if index == 0 && self.nodes[0].is_sequencer() {
return &mut self.nodes[index + 1];
if index == 0 && self.nodes[0].as_ref().map(|n| n.is_sequencer()).unwrap_or(false) {
return self.nodes[index + 1].as_mut().expect("follower node has been shutdown");
}
&mut self.nodes[index]
self.nodes[index].as_mut().expect("follower node has been shutdown")
}

/// Get the wallet.
Expand Down Expand Up @@ -178,16 +209,21 @@ impl TestFixture {
node_index: usize,
tx: impl Into<alloy_primitives::Bytes>,
) -> eyre::Result<()> {
self.nodes[node_index].node.rpc.inject_tx(tx.into()).await?;
let node = self.nodes[node_index]
.as_ref()
.ok_or_else(|| eyre::eyre!("Node at index {} has been shutdown", node_index))?;
node.node.rpc.inject_tx(tx.into()).await?;
Ok(())
}

/// Get the current (latest) block from a specific node.
pub async fn get_block(&self, node_index: usize) -> eyre::Result<Block<Transaction>> {
use reth_rpc_api::EthApiServer;

self.nodes[node_index]
.node
let node = self.nodes[node_index]
.as_ref()
.ok_or_else(|| eyre::eyre!("Node at index {} has been shutdown", node_index))?;
node.node
.rpc
.inner
.eth_api()
Expand All @@ -206,8 +242,8 @@ impl TestFixture {
&self,
node_index: usize,
) -> eyre::Result<rollup_node_chain_orchestrator::ChainOrchestratorStatus> {
self.nodes[node_index]
.rollup_manager_handle
let node = self.get_node(node_index)?;
node.rollup_manager_handle
.status()
.await
.map_err(|e| eyre::eyre!("Failed to get status: {}", e))
Expand Down Expand Up @@ -560,8 +596,7 @@ impl TestFixtureBuilder {
}

/// Build the test fixture.
pub async fn build(self) -> eyre::Result<TestFixture> {
let mut config = self.config;
pub async fn build(mut self) -> eyre::Result<TestFixture> {
let chain_spec = self.chain_spec.unwrap_or_else(|| SCROLL_DEV.clone());

// Start Anvil if requested
Expand All @@ -582,22 +617,25 @@ impl TestFixtureBuilder {
.map_err(|e| eyre::eyre!("Failed to parse Anvil endpoint URL: {}", e))?;

// Configure L1 provider and blob provider to use Anvil
config.l1_provider_args.url = Some(endpoint_url.clone());
config.l1_provider_args.logs_query_block_range = 500;
config.blob_provider_args.anvil_url = Some(endpoint_url);
config.blob_provider_args.mock = false;
self.config.l1_provider_args.url = Some(endpoint_url.clone());
self.config.l1_provider_args.logs_query_block_range = 500;
self.config.blob_provider_args.anvil_url = Some(endpoint_url);
self.config.blob_provider_args.mock = false;

Some(handle)
} else {
None
};

let (nodes, _tasks, wallet) = setup_engine(
config.clone(),
let tasks = TaskManager::current();
let (nodes, dbs, wallet) = setup_engine(
&tasks,
self.config.clone(),
self.num_nodes,
chain_spec.clone(),
self.is_dev,
self.no_local_transactions_propagation,
None,
)
.await?;

Expand All @@ -622,26 +660,28 @@ impl TestFixtureBuilder {
let chain_orchestrator_rx =
node.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?;

node_handles.push(NodeHandle {
node_handles.push(Some(NodeHandle {
node,
engine,
chain_orchestrator_rx,
l1_watcher_tx,
rollup_manager_handle,
typ: if config.sequencer_args.sequencer_enabled && index == 0 {
typ: if self.config.sequencer_args.sequencer_enabled && index == 0 {
NodeType::Sequencer
} else {
NodeType::Follower
},
});
}));
}

Ok(TestFixture {
nodes: node_handles,
dbs,
wallet: Arc::new(Mutex::new(wallet)),
chain_spec,
_tasks,
tasks,
anvil,
config: self.config,
})
}

Expand Down
Loading
Loading