From 011026926331097a23ef27cf461eed4b79ddae73 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Wed, 28 May 2025 08:53:12 +0200 Subject: [PATCH 1/3] Introduce a struct --- crates/op-rbuilder/src/main.rs | 7 ++++--- crates/op-rbuilder/src/monitor_tx_pool.rs | 25 +++++++++++++++++++---- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/crates/op-rbuilder/src/main.rs b/crates/op-rbuilder/src/main.rs index f378f961..67d182a3 100644 --- a/crates/op-rbuilder/src/main.rs +++ b/crates/op-rbuilder/src/main.rs @@ -5,7 +5,6 @@ use reth_optimism_node::{ node::{OpAddOnsBuilder, OpPoolBuilder}, OpNode, }; -use reth_transaction_pool::TransactionPool; /// CLI argument parsing. pub mod args; @@ -22,7 +21,7 @@ use metrics::{ VersionInfo, BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES, VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA, }; -use monitor_tx_pool::monitor_tx_pool; +use monitor_tx_pool::TransactionPoolMonitor; use revert_protection::{EthApiOverrideServer, RevertProtectionExt}; use tx::FBPooledTransaction; @@ -116,10 +115,12 @@ where VERSION.register_version_metrics(); if builder_args.log_pool_transactions { tracing::info!("Logging pool transactions"); + + let tx_monitor = TransactionPoolMonitor::new(ctx.pool.clone()); ctx.task_executor.spawn_critical( "txlogging", Box::pin(async move { - monitor_tx_pool(ctx.pool.all_transactions_event_listener()).await; + tx_monitor.run().await; }), ); } diff --git a/crates/op-rbuilder/src/monitor_tx_pool.rs b/crates/op-rbuilder/src/monitor_tx_pool.rs index 5bb3a4ea..03217e63 100644 --- a/crates/op-rbuilder/src/monitor_tx_pool.rs +++ b/crates/op-rbuilder/src/monitor_tx_pool.rs @@ -1,11 +1,28 @@ use crate::tx::FBPooledTransaction; use futures_util::StreamExt; -use reth_transaction_pool::{AllTransactionsEvents, FullTransactionEvent}; +use reth_transaction_pool::{FullTransactionEvent, TransactionPool}; use tracing::info; -pub async fn monitor_tx_pool(mut new_transactions: AllTransactionsEvents) { - while let Some(event) = new_transactions.next().await { - transaction_event_log(event); +pub struct TransactionPoolMonitor { + pool: Pool, +} + +impl TransactionPoolMonitor { + pub fn new(pool: Pool) -> Self { + Self { pool } + } +} + +impl TransactionPoolMonitor +where + Pool: TransactionPool + Clone + 'static, +{ + pub async fn run(self) { + let mut new_transactions = self.pool.all_transactions_event_listener(); + + while let Some(event) = new_transactions.next().await { + transaction_event_log(event); + } } } From 50b8c16ba6dfea98e191f6e50106b1f7ef05db31 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Wed, 28 May 2025 09:50:52 +0200 Subject: [PATCH 2/3] Try with a webscoket subscription --- Cargo.lock | 21 ++++++++++++ crates/op-rbuilder/Cargo.toml | 2 +- crates/op-rbuilder/src/main.rs | 10 ++++-- crates/op-rbuilder/src/monitor_tx_pool.rs | 40 +++++++++++++++++++++-- 4 files changed, 66 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ce9001e..66cd8b3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5597,6 +5597,7 @@ dependencies = [ "reth-transaction-pool", "reth-trie", "revm", + "ringbuf", "rollup-boost", "secp256k1", "serde", @@ -6036,6 +6037,15 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" +[[package]] +name = "portable-atomic-util" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" +dependencies = [ + "portable-atomic", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -9716,6 +9726,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "ringbuf" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe47b720588c8702e34b5979cb3271a8b1842c7cb6f57408efa70c779363488c" +dependencies = [ + "crossbeam-utils", + "portable-atomic", + "portable-atomic-util", +] + [[package]] name = "ringbuffer" version = "0.15.0" diff --git a/crates/op-rbuilder/Cargo.toml b/crates/op-rbuilder/Cargo.toml index dbefc829..d616bfeb 100644 --- a/crates/op-rbuilder/Cargo.toml +++ b/crates/op-rbuilder/Cargo.toml @@ -100,7 +100,7 @@ rand = "0.9.0" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } shellexpand = "3.1" serde_yaml = { version = "0.9" } - +ringbuf = "0.4" # `msozin/flashblocks-v1.4.1` branch based on `flashblocks-rebase` rollup-boost = { git = "http://github.com/flashbots/rollup-boost", rev = "8506dfb7d84c65746f7c88d250983658438f59e8" } diff --git a/crates/op-rbuilder/src/main.rs b/crates/op-rbuilder/src/main.rs index 67d182a3..029e805f 100644 --- a/crates/op-rbuilder/src/main.rs +++ b/crates/op-rbuilder/src/main.rs @@ -21,7 +21,7 @@ use metrics::{ VersionInfo, BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES, VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA, }; -use monitor_tx_pool::TransactionPoolMonitor; +use monitor_tx_pool::{EthPubSubApiServer, TransactionPoolMonitor}; use revert_protection::{EthApiOverrideServer, RevertProtectionExt}; use tx::FBPooledTransaction; @@ -103,7 +103,10 @@ where let pool = ctx.pool().clone(); let provider = ctx.provider().clone(); - let revert_protection_ext = RevertProtectionExt::new(pool, provider); + let revert_protection_ext = RevertProtectionExt::new(pool.clone(), provider); + + let tx_monitor = TransactionPoolMonitor::new(pool, true); + ctx.modules.merge_configured(tx_monitor.into_rpc())?; ctx.modules .merge_configured(revert_protection_ext.into_rpc())?; @@ -116,13 +119,14 @@ where if builder_args.log_pool_transactions { tracing::info!("Logging pool transactions"); - let tx_monitor = TransactionPoolMonitor::new(ctx.pool.clone()); + /* ctx.task_executor.spawn_critical( "txlogging", Box::pin(async move { tx_monitor.run().await; }), ); + */ } Ok(()) diff --git a/crates/op-rbuilder/src/monitor_tx_pool.rs b/crates/op-rbuilder/src/monitor_tx_pool.rs index 03217e63..9f7664d2 100644 --- a/crates/op-rbuilder/src/monitor_tx_pool.rs +++ b/crates/op-rbuilder/src/monitor_tx_pool.rs @@ -1,15 +1,32 @@ use crate::tx::FBPooledTransaction; +use alloy_transport_http::reqwest; use futures_util::StreamExt; +use jsonrpsee::{ + core::{async_trait, RpcResult, SubscriptionResult}, + proc_macros::rpc, + PendingSubscriptionSink, +}; use reth_transaction_pool::{FullTransactionEvent, TransactionPool}; +use ringbuf::{traits::Producer, HeapRb}; +use serde::Serialize; use tracing::info; +/// Ethereum pub-sub rpc interface. +#[rpc(server, namespace = "txpool")] // TODO: Change to internal namespace +pub trait EthPubSubApi { + /// Create an ethereum subscription for the given params + #[subscription(name = "subscribe", item = String)] + async fn sub(&self) -> SubscriptionResult; +} + pub struct TransactionPoolMonitor { pool: Pool, + log_events: bool, } impl TransactionPoolMonitor { - pub fn new(pool: Pool) -> Self { - Self { pool } + pub fn new(pool: Pool, log_events: bool) -> Self { + Self { pool, log_events } } } @@ -18,14 +35,31 @@ where Pool: TransactionPool + Clone + 'static, { pub async fn run(self) { + let mut buffer = HeapRb::new(1000); + tokio::spawn(async move {}); + let mut new_transactions = self.pool.all_transactions_event_listener(); while let Some(event) = new_transactions.next().await { - transaction_event_log(event); + buffer.try_push(event.clone()).unwrap(); + + if self.log_events { + transaction_event_log(event); + } } } } +#[async_trait] +impl EthPubSubApiServer for TransactionPoolMonitor +where + Pool: TransactionPool + Clone + 'static, +{ + async fn sub(&self, pending: PendingSubscriptionSink) -> SubscriptionResult { + todo!() + } +} + fn transaction_event_log(event: FullTransactionEvent) { match event { FullTransactionEvent::Pending(hash) => { From e93dc40953e11f7cc9779e694a1a1c4c496673f8 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Wed, 28 May 2025 13:00:34 +0200 Subject: [PATCH 3/3] More changes --- Cargo.lock | 21 -- crates/op-rbuilder/Cargo.toml | 1 - crates/op-rbuilder/src/args/op.rs | 8 + crates/op-rbuilder/src/main.rs | 31 ++- crates/op-rbuilder/src/monitor_tx_pool.rs | 235 ++++++++++++------ .../src/tests/framework/harness.rs | 4 + crates/op-rbuilder/src/tests/framework/op.rs | 11 + .../op-rbuilder/src/tests/vanilla/txpool.rs | 37 +++ 8 files changed, 235 insertions(+), 113 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 66cd8b3f..4ce9001e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5597,7 +5597,6 @@ dependencies = [ "reth-transaction-pool", "reth-trie", "revm", - "ringbuf", "rollup-boost", "secp256k1", "serde", @@ -6037,15 +6036,6 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" -[[package]] -name = "portable-atomic-util" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" -dependencies = [ - "portable-atomic", -] - [[package]] name = "powerfmt" version = "0.2.0" @@ -9726,17 +9716,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "ringbuf" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe47b720588c8702e34b5979cb3271a8b1842c7cb6f57408efa70c779363488c" -dependencies = [ - "crossbeam-utils", - "portable-atomic", - "portable-atomic-util", -] - [[package]] name = "ringbuffer" version = "0.15.0" diff --git a/crates/op-rbuilder/Cargo.toml b/crates/op-rbuilder/Cargo.toml index d616bfeb..0fd9bd7b 100644 --- a/crates/op-rbuilder/Cargo.toml +++ b/crates/op-rbuilder/Cargo.toml @@ -100,7 +100,6 @@ rand = "0.9.0" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } shellexpand = "3.1" serde_yaml = { version = "0.9" } -ringbuf = "0.4" # `msozin/flashblocks-v1.4.1` branch based on `flashblocks-rebase` rollup-boost = { git = "http://github.com/flashbots/rollup-boost", rev = "8506dfb7d84c65746f7c88d250983658438f59e8" } diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 136e58ad..df6bed65 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -31,6 +31,14 @@ pub struct OpRbuilderArgs { #[arg(long = "builder.log-pool-transactions", default_value = "false")] pub log_pool_transactions: bool, + /// Signals whether to enable the txpool monitor + #[arg(long = "builder.enable-txpool-monitor", default_value = "false")] + pub enable_txpool_monitor: bool, + + /// The buffer size for the txpool events + #[arg(long = "builder.txpool-monitor-buffer-size", default_value = "1000")] + pub txpool_monitor_buffer_size: usize, + /// How much time extra to wait for the block building job to complete and not get garbage collected #[arg(long = "builder.extra-block-deadline-secs", default_value = "20")] pub extra_block_deadline_secs: u64, diff --git a/crates/op-rbuilder/src/main.rs b/crates/op-rbuilder/src/main.rs index 029e805f..89fde075 100644 --- a/crates/op-rbuilder/src/main.rs +++ b/crates/op-rbuilder/src/main.rs @@ -21,7 +21,7 @@ use metrics::{ VersionInfo, BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES, VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA, }; -use monitor_tx_pool::{EthPubSubApiServer, TransactionPoolMonitor}; +use monitor_tx_pool::{TransactionPoolMonitor, TxpoolExtApiServer}; use revert_protection::{EthApiOverrideServer, RevertProtectionExt}; use tx::FBPooledTransaction; @@ -98,39 +98,44 @@ where .build(), ) .extend_rpc_modules(move |ctx| { + let pool = ctx.pool().clone(); + if builder_args.enable_revert_protection { tracing::info!("Revert protection enabled"); - let pool = ctx.pool().clone(); let provider = ctx.provider().clone(); let revert_protection_ext = RevertProtectionExt::new(pool.clone(), provider); - let tx_monitor = TransactionPoolMonitor::new(pool, true); - ctx.modules.merge_configured(tx_monitor.into_rpc())?; - ctx.modules .merge_configured(revert_protection_ext.into_rpc())?; } - Ok(()) - }) - .on_node_started(move |ctx| { - VERSION.register_version_metrics(); - if builder_args.log_pool_transactions { + if builder_args.log_pool_transactions || builder_args.enable_txpool_monitor { tracing::info!("Logging pool transactions"); - /* - ctx.task_executor.spawn_critical( + let tx_monitor = TransactionPoolMonitor::new( + pool, + builder_args.log_pool_transactions, + builder_args.enable_txpool_monitor, + builder_args.txpool_monitor_buffer_size, + ); + ctx.modules.merge_configured(tx_monitor.rpc().into_rpc())?; + + ctx.node().task_executor.spawn_critical( "txlogging", Box::pin(async move { tx_monitor.run().await; }), ); - */ } Ok(()) }) + .on_node_started(move |_ctx| { + VERSION.register_version_metrics(); + + Ok(()) + }) .launch() .await?; diff --git a/crates/op-rbuilder/src/monitor_tx_pool.rs b/crates/op-rbuilder/src/monitor_tx_pool.rs index 9f7664d2..a8d93888 100644 --- a/crates/op-rbuilder/src/monitor_tx_pool.rs +++ b/crates/op-rbuilder/src/monitor_tx_pool.rs @@ -1,32 +1,50 @@ use crate::tx::FBPooledTransaction; -use alloy_transport_http::reqwest; +use alloy_primitives::TxHash; use futures_util::StreamExt; use jsonrpsee::{ - core::{async_trait, RpcResult, SubscriptionResult}, + core::{async_trait, SubscriptionResult}, proc_macros::rpc, - PendingSubscriptionSink, + PendingSubscriptionSink, SubscriptionMessage, }; -use reth_transaction_pool::{FullTransactionEvent, TransactionPool}; -use ringbuf::{traits::Producer, HeapRb}; +use reth_transaction_pool::{FullTransactionEvent, TransactionEvent, TransactionPool}; use serde::Serialize; +use tokio::sync::broadcast; use tracing::info; -/// Ethereum pub-sub rpc interface. -#[rpc(server, namespace = "txpool")] // TODO: Change to internal namespace -pub trait EthPubSubApi { - /// Create an ethereum subscription for the given params - #[subscription(name = "subscribe", item = String)] - async fn sub(&self) -> SubscriptionResult; +#[rpc(server, namespace = "txpool")] +pub trait TxpoolExtApi { + /// Creates a subscription that returns the txpool events. + #[subscription(name = "subscribeEvents", item = usize)] + fn subscribe_events(&self) -> SubscriptionResult; } pub struct TransactionPoolMonitor { pool: Pool, log_events: bool, + txpool_monitor: bool, + event_sender: broadcast::Sender, + // Keep a receiver to prevent channel from closing + _event_receiver: broadcast::Receiver, } impl TransactionPoolMonitor { - pub fn new(pool: Pool, log_events: bool) -> Self { - Self { pool, log_events } + pub fn new(pool: Pool, log_events: bool, txpool_monitor: bool, buffer_size: usize) -> Self { + let (event_sender, _event_receiver) = broadcast::channel(buffer_size); + + if log_events { + info!("Logging pool transactions"); + } + if txpool_monitor { + info!("Monitoring txpool enabled"); + } + + Self { + pool, + log_events, + txpool_monitor, + event_sender, + _event_receiver, + } } } @@ -34,86 +52,147 @@ impl TransactionPoolMonitor where Pool: TransactionPool + Clone + 'static, { - pub async fn run(self) { - let mut buffer = HeapRb::new(1000); - tokio::spawn(async move {}); + pub fn rpc(&self) -> TransactionPoolMonitorRpc { + TransactionPoolMonitorRpc { + event_sender: self.event_sender.clone(), + } + } + pub async fn run(self) { let mut new_transactions = self.pool.all_transactions_event_listener(); while let Some(event) = new_transactions.next().await { - buffer.try_push(event.clone()).unwrap(); - + // Push the event to the buffer + let event_data = TransactionEventData::from(event); if self.log_events { - transaction_event_log(event); + info!( + target = "monitoring", + tx_hash = event_data.hash.to_string(), + kind = event_data.kind(), + "Transaction event received" + ) + } + + if self.txpool_monitor { + println!("Sending event: {:?}", event_data); + let _ = self.event_sender.send(event_data); } } } } +pub struct TransactionPoolMonitorRpc { + event_sender: broadcast::Sender, +} + #[async_trait] -impl EthPubSubApiServer for TransactionPoolMonitor -where - Pool: TransactionPool + Clone + 'static, -{ - async fn sub(&self, pending: PendingSubscriptionSink) -> SubscriptionResult { - todo!() +impl TxpoolExtApiServer for TransactionPoolMonitorRpc { + fn subscribe_events( + &self, + pending_subscription_sink: PendingSubscriptionSink, + ) -> SubscriptionResult { + println!("Subscribing to txpool events"); + let mut event_receiver = self.event_sender.subscribe(); + + tokio::spawn(async move { + let sink = match pending_subscription_sink.accept().await { + Ok(sink) => sink, + Err(e) => { + tracing::warn!("failed to accept subscription: {e}"); + return; + } + }; + + println!("Subscribed to txpool events"); + + loop { + match event_receiver.recv().await { + Ok(event) => { + println!("Received event: {:?}", event); + + let msg = SubscriptionMessage::from( + serde_json::value::to_raw_value(&event) + .expect("Failed to serialize event"), + ); + + if sink.send(msg).await.is_err() { + tracing::debug!("Subscription closed"); + break; + } + } + Err(broadcast::error::RecvError::Lagged(_)) => { + tracing::warn!("Subscription lagged, some events were dropped"); + continue; + } + Err(broadcast::error::RecvError::Closed) => { + tracing::debug!("Event channel closed"); + break; + } + } + } + }); + + Ok(()) } } -fn transaction_event_log(event: FullTransactionEvent) { - match event { - FullTransactionEvent::Pending(hash) => { - info!( - target = "monitoring", - tx_hash = hash.to_string(), - kind = "pending", - "Transaction event received" - ) - } - FullTransactionEvent::Queued(hash) => { - info!( - target = "monitoring", - tx_hash = hash.to_string(), - kind = "queued", - "Transaction event received" - ) - } - FullTransactionEvent::Mined { - tx_hash, - block_hash, - } => info!( - target = "monitoring", - tx_hash = tx_hash.to_string(), - kind = "mined", - block_hash = block_hash.to_string(), - "Transaction event received" - ), - FullTransactionEvent::Replaced { - transaction, - replaced_by, - } => info!( - target = "monitoring", - tx_hash = transaction.hash().to_string(), - kind = "replaced", - replaced_by = replaced_by.to_string(), - "Transaction event received" - ), - FullTransactionEvent::Discarded(hash) => { - info!( - target = "monitoring", - tx_hash = hash.to_string(), - kind = "discarded", - "Transaction event received" - ) +#[derive(Clone, Debug, Serialize)] +struct TransactionEventData { + hash: TxHash, + transaction_event: TransactionEvent, +} + +impl TransactionEventData { + pub fn kind(&self) -> &str { + match self.transaction_event { + TransactionEvent::Pending => "pending", + TransactionEvent::Queued => "queued", + TransactionEvent::Mined(_) => "mined", + TransactionEvent::Replaced(_) => "replaced", + TransactionEvent::Discarded => "discarded", + TransactionEvent::Invalid => "invalid", + TransactionEvent::Propagated(_) => "propagated", } - FullTransactionEvent::Invalid(hash) => { - info!( - target = "monitoring", - tx_hash = hash.to_string(), - kind = "invalid", - "Transaction event received" - ) + } +} + +impl From> for TransactionEventData { + fn from(event: FullTransactionEvent) -> Self { + match event { + FullTransactionEvent::Pending(hash) => Self { + hash, + transaction_event: TransactionEvent::Pending, + }, + FullTransactionEvent::Queued(hash) => Self { + hash, + transaction_event: TransactionEvent::Queued, + }, + FullTransactionEvent::Mined { + tx_hash, + block_hash, + } => Self { + hash: tx_hash, + transaction_event: TransactionEvent::Mined(block_hash), + }, + FullTransactionEvent::Replaced { + transaction, + replaced_by, + } => Self { + hash: *transaction.hash(), + transaction_event: TransactionEvent::Replaced(replaced_by), + }, + FullTransactionEvent::Discarded(hash) => Self { + hash, + transaction_event: TransactionEvent::Discarded, + }, + FullTransactionEvent::Invalid(hash) => Self { + hash, + transaction_event: TransactionEvent::Invalid, + }, + FullTransactionEvent::Propagated(kind) => Self { + hash: TxHash::default(), + transaction_event: TransactionEvent::Propagated(kind), + }, } - FullTransactionEvent::Propagated(_propagated) => {} } } diff --git a/crates/op-rbuilder/src/tests/framework/harness.rs b/crates/op-rbuilder/src/tests/framework/harness.rs index 30fb084e..eb630b14 100644 --- a/crates/op-rbuilder/src/tests/framework/harness.rs +++ b/crates/op-rbuilder/src/tests/framework/harness.rs @@ -86,12 +86,14 @@ impl TestHarnessBuilder { let builder_data_dir: PathBuf = std::env::temp_dir().join(Uuid::new_v4().to_string()); let builder_auth_rpc_port = get_available_port(); let builder_http_port = get_available_port(); + let builder_ws_port = get_available_port(); let mut op_rbuilder_config = OpRbuilderConfig::new() .chain_config_path(genesis_path.clone()) .data_dir(builder_data_dir) .auth_rpc_port(builder_auth_rpc_port) .network_port(get_available_port()) .http_port(builder_http_port) + .ws_port(builder_ws_port) .with_builder_private_key(BUILDER_PRIVATE_KEY) .with_revert_protection(self.use_revert_protection) .with_namespaces(self.namespaces) @@ -131,6 +133,7 @@ impl TestHarnessBuilder { framework: framework, builder_auth_rpc_port, builder_http_port, + builder_ws_port, validator_auth_rpc_port, builder_log_path, }) @@ -141,6 +144,7 @@ pub struct TestHarness { framework: IntegrationFramework, builder_auth_rpc_port: u16, builder_http_port: u16, + pub builder_ws_port: u16, validator_auth_rpc_port: u16, builder_log_path: PathBuf, } diff --git a/crates/op-rbuilder/src/tests/framework/op.rs b/crates/op-rbuilder/src/tests/framework/op.rs index 30aae7a8..e2a30b89 100644 --- a/crates/op-rbuilder/src/tests/framework/op.rs +++ b/crates/op-rbuilder/src/tests/framework/op.rs @@ -21,6 +21,7 @@ pub struct OpRbuilderConfig { chain_config_path: Option, data_dir: Option, http_port: Option, + ws_port: Option, network_port: Option, builder_private_key: Option, flashblocks_port: Option, @@ -61,6 +62,11 @@ impl OpRbuilderConfig { self } + pub fn ws_port(mut self, port: u16) -> Self { + self.ws_port = Some(port); + self + } + pub fn with_builder_private_key(mut self, private_key: &str) -> Self { self.builder_private_key = Some(private_key.to_string()); self @@ -130,6 +136,7 @@ impl Service for OpRbuilderConfig { .arg("--color") .arg("never") .arg("--builder.log-pool-transactions") + .arg("--builder.enable-txpool-monitor") .arg("--port") .arg(self.network_port.expect("network_port not set").to_string()) .arg("--ipcdisable") @@ -152,6 +159,10 @@ impl Service for OpRbuilderConfig { .arg(http_port.to_string()); } + if let Some(ws_port) = self.ws_port { + cmd.arg("--ws").arg("--ws.port").arg(ws_port.to_string()); + } + if let Some(flashblocks_port) = &self.flashblocks_port { cmd.arg("--flashblocks.enabled").arg("true"); cmd.arg("--flashblocks.addr").arg("127.0.0.1"); diff --git a/crates/op-rbuilder/src/tests/vanilla/txpool.rs b/crates/op-rbuilder/src/tests/vanilla/txpool.rs index cbf15512..1c2a607d 100644 --- a/crates/op-rbuilder/src/tests/vanilla/txpool.rs +++ b/crates/op-rbuilder/src/tests/vanilla/txpool.rs @@ -1,5 +1,7 @@ use crate::tests::{framework::TestHarnessBuilder, ONE_ETH}; use alloy_provider::ext::TxPoolApi; +use jsonrpsee::proc_macros::rpc; +use jsonrpsee::ws_client::WsClientBuilder; /// This test ensures that pending pool custom limit is respected and priority tx would be included even when pool if full. #[tokio::test] @@ -67,3 +69,38 @@ async fn pending_pool_limit() -> eyre::Result<()> { Ok(()) } + +#[rpc(client, namespace = "txpool")] +pub trait TxpoolExtApi { + /// Creates a subscription that returns the txpool events. + #[subscription(name = "subscribeEvents", item = usize)] + fn subscribe_events(&self) -> SubscriptionResult; +} + +/// This test ensures that if we enable the txpool monitor, there is a websocket +/// on which we can subscribe and receive txpool events. +#[tokio::test] +async fn txpool_monitor() -> eyre::Result<()> { + let harness = TestHarnessBuilder::new("txpool_monitor") + .with_namespaces("txpool,eth,debug,admin,txpool") + .build() + .await?; + + let ws_url = format!("ws://127.0.0.1:{}", harness.builder_ws_port); + let client = WsClientBuilder::default().build(&ws_url).await.unwrap(); + + // send 10 transactions + for _ in 0..10 { + let tx = harness.create_transaction().send().await?; + println!("tx: {:?}", tx); + } + + // If we subscribe now, we should receive 10 events, one for each tx since they are internally buffered + let mut sub = TxpoolExtApiClient::subscribe_events(&client) + .await + .expect("failed to subscribe"); + + println!("sub: {:?}", sub.next().await); + + Ok(()) +}