From 45afc17ec43ff8978742b2ad0d84de69d5b77486 Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Mon, 3 Nov 2025 18:06:16 +0900 Subject: [PATCH 1/4] feat: continuous flashblock building # Conflicts: # crates/op-rbuilder/src/builders/flashblocks/config.rs # crates/op-rbuilder/src/builders/flashblocks/payload.rs --- crates/op-rbuilder/src/args/op.rs | 8 + crates/op-rbuilder/src/builders/context.rs | 6 +- .../src/builders/flashblocks/config.rs | 7 + .../src/builders/flashblocks/payload.rs | 365 +++++++++++++++--- .../src/builders/standard/payload.rs | 2 +- .../src/primitives/reth/execution.rs | 2 +- 6 files changed, 337 insertions(+), 53 deletions(-) diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 4e3d0308..0ecfef25 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -157,6 +157,14 @@ pub struct FlashblocksArgs { )] pub flashblocks_disable_state_root: bool, + /// Should we calculate state root for each flashblock + #[arg( + long = "flashblocks.enable-continuous-building", + default_value = "true", + env = "FLASHBLOCKS_ENABLE_CONTINUOUS_BUILDING" + )] + pub flashblocks_enable_continuous_building: bool, + /// Flashblocks number contract address /// /// This is the address of the contract that will be used to increment the flashblock number. diff --git a/crates/op-rbuilder/src/builders/context.rs b/crates/op-rbuilder/src/builders/context.rs index 1c042fc4..6f21ec2f 100644 --- a/crates/op-rbuilder/src/builders/context.rs +++ b/crates/op-rbuilder/src/builders/context.rs @@ -1,3 +1,4 @@ +use alloy_consensus::transaction::TxHashRef; use alloy_consensus::{Eip658Value, Transaction, conditional::BlockConditionalAttributes}; use alloy_eips::{Encodable2718, Typed2718}; use alloy_evm::Database; @@ -375,10 +376,11 @@ impl OpPayloadBuilderCtx { Ok(info) } - /// Executes the given best transactions and updates the execution info. + /// Simulates the given best transactions. + /// The simulation updates the execution info and commit changes to the db /// /// Returns `Ok(Some(())` if the job was cancelled. - pub(super) fn execute_best_transactions( + pub(super) fn simulate_best_transactions( &self, info: &mut ExecutionInfo, db: &mut State, diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index cdc6ae91..9c46d011 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -34,6 +34,9 @@ pub struct FlashblocksConfig { /// Should we disable state root calculation for each flashblock pub disable_state_root: bool, + /// Whether to enable continuous flashblock building or not + pub enable_continuous_building: bool, + /// The address of the flashblocks number contract. /// /// If set a builder tx will be added to the start of every flashblock instead of the regular builder tx. @@ -66,6 +69,7 @@ impl Default for FlashblocksConfig { leeway_time: Duration::from_millis(50), fixed: false, disable_state_root: false, + enable_continuous_building: true, flashblocks_number_contract_address: None, flashblocks_number_contract_use_permit: false, p2p_enabled: false, @@ -94,6 +98,8 @@ impl TryFrom for FlashblocksConfig { let disable_state_root = args.flashblocks.flashblocks_disable_state_root; + let enable_continuous_building = args.flashblocks.flashblocks_enable_continuous_building; + let flashblocks_number_contract_address = args.flashblocks.flashblocks_number_contract_address; @@ -106,6 +112,7 @@ impl TryFrom for FlashblocksConfig { leeway_time, fixed, disable_state_root, + enable_continuous_building, flashblocks_number_contract_address, flashblocks_number_contract_use_permit, p2p_enabled: args.flashblocks.p2p.p2p_enabled, diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 1d5d1785..ce58e3b7 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -37,9 +37,11 @@ use reth_provider::{ StorageRootProvider, }; use reth_revm::{ - State, database::StateProviderDatabase, db::states::bundle_state::BundleRetention, + State, + database::StateProviderDatabase, + db::{CacheState, TransitionState, states::bundle_state::BundleRetention}, }; -use reth_transaction_pool::TransactionPool; +use reth_transaction_pool::{BestTransactions, TransactionPool}; use reth_trie::{HashedPostState, updates::TrieUpdates}; use revm::Database; use rollup_boost::{ @@ -113,6 +115,14 @@ impl FlashblocksExtraCtx { } } +type FlashblockCandidate = ( + OpBuiltPayload, + FlashblocksPayloadV1, + ExecutionInfo, + CacheState, + Option, +); + impl OpPayloadBuilderCtx { /// Returns the current flashblock index pub(crate) fn flashblock_index(&self) -> u64 { @@ -324,6 +334,8 @@ where let timestamp = config.attributes.timestamp(); let disable_state_root = self.config.specific.disable_state_root; + let enable_continuous_building = self.config.specific.enable_continuous_building; + let ctx = self .get_op_payload_builder_ctx( config.clone(), @@ -479,6 +491,7 @@ where let interval = self.config.specific.interval; let (tx, mut rx) = mpsc::channel((self.config.flashblocks_per_block() + 1) as usize); + // Spawn a task to orchestrate flashblock building jobs per interval tokio::spawn({ let block_cancel = block_cancel.clone(); @@ -492,12 +505,13 @@ where loop { tokio::select! { + // end of interval reached, conclude current flashblock building and start next one _ = timer.tick() => { - // cancel current payload building job + // cancel current flashblock building job fb_cancel.cancel(); fb_cancel = block_cancel.child_token(); - // this will tick at first_flashblock_offset, - // starting the second flashblock + // this will tick at first_flashblock_offset + k * interval, + // starting the next flashblock (k+1) if tx.send(fb_cancel.clone()).await.is_err() { // receiver channel was dropped, return. // this will only happen if the `build_payload` function returns, @@ -538,43 +552,82 @@ where return Ok(()); } - // build first flashblock immediately - let next_flashblocks_ctx = match self - .build_next_flashblock( - &ctx, - &mut info, - &mut state, - &state_provider, - &mut best_txs, - &block_cancel, - &best_payload, - &fb_span, - ) - .await - { - Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, - Ok(None) => { - self.record_flashblocks_metrics( + // build next flashblock immediately + let next_flashblocks_ctx = if enable_continuous_building { + match self + .build_next_flashblock_continuous( &ctx, - &info, - flashblocks_per_block, - &span, - "Payload building complete, job cancelled or target flashblock count reached", - ); - return Ok(()); + &mut info, + &mut state, + &state_provider, + &mut best_txs, + &block_cancel, + &best_payload, + &fb_span, + ) + .await + { + Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, + Ok(None) => { + self.record_flashblocks_metrics( + &ctx, + &info, + flashblocks_per_block, + &span, + "Payload building complete, job cancelled or target flashblock count reached", + ); + return Ok(()); + } + Err(err) => { + error!( + target: "payload_builder", + "Failed to build flashblock {} for block number {}: {}", + ctx.flashblock_index(), + ctx.block_number(), + err + ); + return Err(PayloadBuilderError::Other(err.into())); + } } - Err(err) => { - error!( - target: "payload_builder", - "Failed to build flashblock {} for block number {}: {}", - ctx.flashblock_index(), - ctx.block_number(), - err - ); - return Err(PayloadBuilderError::Other(err.into())); + } else { + match self + .build_next_flashblock( + &ctx, + &mut info, + &mut state, + &state_provider, + &mut best_txs, + &block_cancel, + &best_payload, + &fb_span, + ) + .await + { + Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, + Ok(None) => { + self.record_flashblocks_metrics( + &ctx, + &info, + flashblocks_per_block, + &span, + "Payload building complete, job cancelled or target flashblock count reached", + ); + return Ok(()); + } + Err(err) => { + error!( + target: "payload_builder", + "Failed to build flashblock {} for block number {}: {}", + ctx.flashblock_index(), + ctx.block_number(), + err + ); + return Err(PayloadBuilderError::Other(err.into())); + } } }; + // Wait for next flashblock building job to start, or end of main block building job tokio::select! { Some(fb_cancel) = rx.recv() => { ctx = ctx.with_cancel(fb_cancel).with_extra_ctx(next_flashblocks_ctx); @@ -627,17 +680,13 @@ where ); let flashblock_build_start_time = Instant::now(); - let builder_txs = - match self - .builder_tx - .add_builder_txs(&state_provider, info, ctx, state, true) - { - Ok(builder_txs) => builder_txs, - Err(e) => { - error!(target: "payload_builder", "Error simulating builder txs: {}", e); - vec![] - } - }; + let builder_txs = self + .builder_tx + .add_builder_txs(&state_provider, info, ctx, state, true) + .unwrap_or_else(|e| { + error!(target: "payload_builder", "Error simulating builder txs: {}", e); + vec![] + }); // only reserve builder tx gas / da size that has not been committed yet // committed builder txs would have counted towards the gas / da used @@ -749,7 +798,7 @@ where fb_payload.index = flashblock_index; fb_payload.base = None; - // If main token got canceled in here that means we received get_payload and we should drop everything and now update best_payload + // If main token got canceled in here that means we received get_payload, and we should drop everything and not update best_payload // To ensure that we will return same blocks as rollup-boost (to leverage caches) if block_cancel.is_cancelled() { self.record_flashblocks_metrics( @@ -823,6 +872,224 @@ where } } + /// Takes the current best flashblock candidate, execute new transaction and build a new candidate only if it's better. + /// No state is mutated, but it can be applied later by replacing `state.cache` and `state.transition_state` with the returned `CacheState` and `Option`. + /// The `best_txns` iterator is updated with no updates, it needs to be refreshed again to take into account new mempool transactions. + /// + /// When a new best is found: return `Ok(Some(best))` + /// Else: return the `current` with `Ok(current)` + #[expect(clippy::too_many_arguments)] + fn refresh_best_flashblock_candidate< + DB: Database + std::fmt::Debug + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + >( + &self, + current: Option, + ctx: &OpPayloadBuilderCtx, + info: &ExecutionInfo, + state: &mut State, + state_provider: impl reth::providers::StateProvider + Clone, + best_txs: &mut NextBestFlashblocksTxs, + block_cancel: &CancellationToken, + target_gas_for_batch: u64, + target_da_for_batch: Option, + ) -> eyre::Result> { + // create simulation info and simulation state with same cache and transition state as current state + let mut simulation_info = info.clone(); + let simulation_cache = state.cache.clone(); + let simulation_transition_state = state.transition_state.clone(); + let mut simulation_state = State::builder() + .with_database(StateProviderDatabase::new(&state_provider)) + .with_cached_prestate(simulation_cache) + .with_bundle_update() + .build(); + simulation_state.transition_state = simulation_transition_state; + + // Refresh pool txs + best_txs.refresh_iterator( + BestPayloadTransactions::new( + self.pool + .best_transactions_with_attributes(ctx.best_transaction_attributes()) + .without_updates(), + ), + ctx.flashblock_index(), + ); + + ctx.execute_best_transactions( + &mut simulation_info, + &mut simulation_state, + best_txs, + target_gas_for_batch.min(ctx.block_gas_limit()), + target_da_for_batch, + ) + .wrap_err("failed to execute best transactions")?; + + // Try early return condition + if block_cancel.is_cancelled() { + return Ok(current); + } + + // Add bottom of block builder txs + if let Err(e) = self.builder_tx.add_builder_txs( + &state_provider, + &mut simulation_info, + ctx, + &mut simulation_state, + false, + ) { + error!(target: "payload_builder", "Error simulating builder txs: {}", e); + }; + + // Check if we can build a better block by comparing execution results + let is_better_candidate = |prev: &ExecutionInfo<_>, new: &ExecutionInfo<_>| { + new.cumulative_gas_used > prev.cumulative_gas_used + }; + if current + .as_ref() + .is_some_and(|(_, _, cur, _, _)| !is_better_candidate(cur, &simulation_info)) + { + // Not better, nothing to refresh so we can return early + return Ok(current); + } + + // build block and return new best + build_block( + &mut simulation_state, + ctx, + &mut simulation_info, + ctx.extra_ctx.disable_state_root || ctx.attributes().no_tx_pool, + ) + .map(|(payload, mut fb)| { + fb.index = ctx.flashblock_index(); + fb.base = None; + Some(( + payload, + fb, + simulation_info, + simulation_state.cache, + simulation_state.transition_state, + )) + }) + .wrap_err("failed to build payload") + } + + #[expect(clippy::too_many_arguments)] + async fn build_next_flashblock_continuous< + DB: Database + std::fmt::Debug + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + >( + &self, + ctx: &OpPayloadBuilderCtx, + info: &mut ExecutionInfo, + state: &mut State, + state_provider: impl reth::providers::StateProvider + Clone, + best_txs: &mut NextBestFlashblocksTxs, + block_cancel: &CancellationToken, + best_payload: &BlockCell, + _span: &tracing::Span, + ) -> eyre::Result> { + // 1. --- Prepare shared context --- + + // Add top of block builder txns + let mut target_gas_for_batch = ctx.extra_ctx.target_gas_for_batch; + let mut target_da_for_batch = ctx.extra_ctx.target_da_for_batch; + let builder_txs = self + .builder_tx + .add_builder_txs(&state_provider, info, ctx, state, true) + .unwrap_or_else(|e| { + error!(target: "payload_builder", "Error simulating builder txs: {}", e); + vec![] + }); + + let builder_tx_gas = builder_txs.iter().map(|t| t.gas_used).sum(); + let builder_tx_da_size = builder_txs.iter().map(|t| t.da_size).sum(); + target_gas_for_batch = target_gas_for_batch.saturating_sub(builder_tx_gas); + // saturating sub just in case, we will log an error if da_limit too small for builder_tx_da_size + if let Some(da_limit) = target_da_for_batch.as_mut() { + *da_limit = da_limit.saturating_sub(builder_tx_da_size); + } + + let mut best: Option = None; + + // 2. --- Build candidates and update best --- + loop { + // If main token got canceled in here that means we received get_payload, and we should drop everything and not update best_payload + // To ensure that we will return same blocks as rollup-boost (to leverage caches) + if block_cancel.is_cancelled() { + return Ok(None); + } + // interval end: abort worker and publish current best immediately (below) + if ctx.cancel.is_cancelled() { + break; + } + + // Build one candidate + best = self.refresh_best_flashblock_candidate( + best, + ctx, + &*info, + state, + &state_provider, + best_txs, + block_cancel, + target_gas_for_batch, + target_da_for_batch, + )?; + } + + // if we weren't able to build a single best payload before this point + // then we should drop everything + if best.is_none() { + warn!("Didn't build any best candidate"); + return Ok(None); + } + + // 3. --- Cancellation token received, send best --- + let (payload, fb_payload, execution_info, cache_state, transition_state) = + best.expect("we checked best.is_none and returned early"); + + // Apply state mutations from best + state.cache = cache_state; + state.transition_state = transition_state; + + // Send payloads + let _flashblock_byte_size = self + .ws_pub + .publish(&fb_payload) + .wrap_err("failed to publish flashblock via websocket")?; + self.payload_tx + .send(payload.clone()) + .await + .wrap_err("failed to send built payload to handler")?; + best_payload.set(payload); + + // update execution info + *info = execution_info; + + // Mark selected transactions as commited + let batch_new_transactions = info.executed_transactions[info.extra.last_flashblock_index..] + .to_vec() + .iter() + .map(|tx| tx.tx_hash()) + .collect::>(); + // warn: it also marks the top of blocks builder_txs + best_txs.mark_commited(batch_new_transactions); + + // Update context for next iteration + let target_gas_for_batch = ctx.extra_ctx.target_gas_for_batch + ctx.extra_ctx.gas_per_batch; + let target_da_for_batch = ctx + .extra_ctx + .da_per_batch + .zip(ctx.extra_ctx.target_da_for_batch) + .map(|(da_limit, da)| da + da_limit); + + let next_extra_ctx = ctx + .extra_ctx + .clone() + .next(target_gas_for_batch, target_da_for_batch); + Ok(Some(next_extra_ctx)) + } + /// Do some logging and metric recording when we stop build flashblocks fn record_flashblocks_metrics( &self, diff --git a/crates/op-rbuilder/src/builders/standard/payload.rs b/crates/op-rbuilder/src/builders/standard/payload.rs index f8bb1b58..45457042 100644 --- a/crates/op-rbuilder/src/builders/standard/payload.rs +++ b/crates/op-rbuilder/src/builders/standard/payload.rs @@ -407,7 +407,7 @@ impl OpBuilder<'_, Txs> { .set(transaction_pool_fetch_time); if ctx - .execute_best_transactions( + .simulate_best_transactions( &mut info, db, &mut best_txs, diff --git a/crates/op-rbuilder/src/primitives/reth/execution.rs b/crates/op-rbuilder/src/primitives/reth/execution.rs index 7865a1c8..e2bc7850 100644 --- a/crates/op-rbuilder/src/primitives/reth/execution.rs +++ b/crates/op-rbuilder/src/primitives/reth/execution.rs @@ -24,7 +24,7 @@ pub enum TxnExecutionResult { MaxGasUsageExceeded, } -#[derive(Default, Debug)] +#[derive(Clone, Default, Debug)] pub struct ExecutionInfo { /// All executed transactions (unrecovered). pub executed_transactions: Vec, From 6840e9269ee6dd4b52605059f37dd766d0b74234 Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Tue, 4 Nov 2025 01:44:00 +0900 Subject: [PATCH 2/4] feat: add metrics --- crates/op-rbuilder/src/builders/context.rs | 5 +- .../src/builders/flashblocks/payload.rs | 110 +++++++++++++++--- 2 files changed, 95 insertions(+), 20 deletions(-) diff --git a/crates/op-rbuilder/src/builders/context.rs b/crates/op-rbuilder/src/builders/context.rs index 6f21ec2f..e8b667b4 100644 --- a/crates/op-rbuilder/src/builders/context.rs +++ b/crates/op-rbuilder/src/builders/context.rs @@ -1,6 +1,7 @@ -use alloy_consensus::transaction::TxHashRef; -use alloy_consensus::{Eip658Value, Transaction, conditional::BlockConditionalAttributes}; use alloy_eips::{Encodable2718, Typed2718}; +use alloy_consensus::{ + Eip658Value, Transaction, conditional::BlockConditionalAttributes, transaction::TxHashRef, +}; use alloy_evm::Database; use alloy_op_evm::block::receipt_builder::OpReceiptBuilder; use alloy_primitives::{BlockHash, Bytes, U256}; diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index ce58e3b7..632adf24 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -906,6 +906,7 @@ where simulation_state.transition_state = simulation_transition_state; // Refresh pool txs + let best_txs_start_time = Instant::now(); best_txs.refresh_iterator( BestPayloadTransactions::new( self.pool @@ -914,7 +915,15 @@ where ), ctx.flashblock_index(), ); + let transaction_pool_fetch_time = best_txs_start_time.elapsed(); + ctx.metrics + .transaction_pool_fetch_duration + .record(transaction_pool_fetch_time); + ctx.metrics + .transaction_pool_fetch_gauge + .set(transaction_pool_fetch_time); + let tx_execution_start_time = Instant::now(); ctx.execute_best_transactions( &mut simulation_info, &mut simulation_state, @@ -923,6 +932,13 @@ where target_da_for_batch, ) .wrap_err("failed to execute best transactions")?; + let payload_transaction_simulation_time = tx_execution_start_time.elapsed(); + ctx.metrics + .payload_transaction_simulation_duration + .record(payload_transaction_simulation_time); + ctx.metrics + .payload_transaction_simulation_gauge + .set(payload_transaction_simulation_time); // Try early return condition if block_cancel.is_cancelled() { @@ -953,24 +969,41 @@ where } // build block and return new best - build_block( + let total_block_built_duration = Instant::now(); + + let build_result = build_block( &mut simulation_state, ctx, &mut simulation_info, ctx.extra_ctx.disable_state_root || ctx.attributes().no_tx_pool, - ) - .map(|(payload, mut fb)| { - fb.index = ctx.flashblock_index(); - fb.base = None; - Some(( - payload, - fb, - simulation_info, - simulation_state.cache, - simulation_state.transition_state, - )) - }) - .wrap_err("failed to build payload") + ); + + let total_block_built_duration = total_block_built_duration.elapsed(); + ctx.metrics + .total_block_built_duration + .record(total_block_built_duration); + ctx.metrics + .total_block_built_gauge + .set(total_block_built_duration); + + match build_result { + Err(err) => { + ctx.metrics.invalid_built_blocks_count.increment(1); + Err(err).wrap_err("failed to build payload") + } + Ok((payload, mut fb)) => { + fb.index = ctx.flashblock_index(); + fb.base = None; + + Ok(Some(( + payload, + fb, + simulation_info, + simulation_state.cache, + simulation_state.transition_state, + ))) + } + } } #[expect(clippy::too_many_arguments)] @@ -986,13 +1019,27 @@ where best_txs: &mut NextBestFlashblocksTxs, block_cancel: &CancellationToken, best_payload: &BlockCell, - _span: &tracing::Span, + span: &tracing::Span, ) -> eyre::Result> { // 1. --- Prepare shared context --- - // Add top of block builder txns + let flashblock_index = ctx.flashblock_index(); let mut target_gas_for_batch = ctx.extra_ctx.target_gas_for_batch; let mut target_da_for_batch = ctx.extra_ctx.target_da_for_batch; + info!( + target: "payload_builder", + block_number = ctx.block_number(), + flashblock_index, + target_gas = target_gas_for_batch, + gas_used = info.cumulative_gas_used, + target_da = target_da_for_batch, + da_used = info.cumulative_da_bytes_used, + block_gas_used = ctx.block_gas_limit(), + "Building flashblock", + ); + let flashblock_build_start_time = Instant::now(); + + // Add top of block builder txns let builder_txs = self .builder_tx .add_builder_txs(&state_provider, info, ctx, state, true) @@ -1016,6 +1063,13 @@ where // If main token got canceled in here that means we received get_payload, and we should drop everything and not update best_payload // To ensure that we will return same blocks as rollup-boost (to leverage caches) if block_cancel.is_cancelled() { + self.record_flashblocks_metrics( + ctx, + info, + ctx.target_flashblock_count(), + span, + "Payload building complete, channel closed or job cancelled", + ); return Ok(None); } // interval end: abort worker and publish current best immediately (below) @@ -1053,7 +1107,7 @@ where state.transition_state = transition_state; // Send payloads - let _flashblock_byte_size = self + let flashblock_byte_size = self .ws_pub .publish(&fb_payload) .wrap_err("failed to publish flashblock via websocket")?; @@ -1072,9 +1126,19 @@ where .iter() .map(|tx| tx.tx_hash()) .collect::>(); - // warn: it also marks the top of blocks builder_txs best_txs.mark_commited(batch_new_transactions); + // Record flashblock build duration + ctx.metrics + .flashblock_build_duration + .record(flashblock_build_start_time.elapsed()); + ctx.metrics + .flashblock_byte_size_histogram + .record(flashblock_byte_size as f64); + ctx.metrics + .flashblock_num_tx_histogram + .record(info.executed_transactions.len() as f64); + // Update context for next iteration let target_gas_for_batch = ctx.extra_ctx.target_gas_for_batch + ctx.extra_ctx.gas_per_batch; let target_da_for_batch = ctx @@ -1087,6 +1151,16 @@ where .extra_ctx .clone() .next(target_gas_for_batch, target_da_for_batch); + + info!( + target: "payload_builder", + message = "Flashblock built", + flashblock_index, + current_gas = info.cumulative_gas_used, + current_da = info.cumulative_da_bytes_used, + target_flashblocks = ctx.target_flashblock_count(), + ); + Ok(Some(next_extra_ctx)) } From e3a1b331a5fb80d33af53705165b631f8bc07c7b Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Tue, 4 Nov 2025 22:09:41 +0900 Subject: [PATCH 3/4] fix: clippy --- crates/op-rbuilder/src/builders/context.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/op-rbuilder/src/builders/context.rs b/crates/op-rbuilder/src/builders/context.rs index e8b667b4..7bba2bba 100644 --- a/crates/op-rbuilder/src/builders/context.rs +++ b/crates/op-rbuilder/src/builders/context.rs @@ -429,8 +429,7 @@ impl OpPayloadBuilderCtx { // Note that we need to use the Option to signal whether the transaction comes from a bundle, // otherwise, we would exclude all transactions that are not in the reverted hashes. let is_bundle_tx = reverted_hashes.is_some(); - let exclude_reverting_txs = - is_bundle_tx && !reverted_hashes.unwrap().contains(&tx_hash); + let exclude_reverting_txs = is_bundle_tx && !reverted_hashes.unwrap().contains(tx_hash); let log_txn = |result: TxnExecutionResult| { debug!( From 675653964fa5cb45674e43e905f79001fb1abe15 Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Tue, 2 Dec 2025 17:17:40 -0500 Subject: [PATCH 4/4] add da --- crates/op-rbuilder/src/builders/context.rs | 7 ++- .../src/builders/flashblocks/payload.rs | 52 +++++++++++++++---- .../src/builders/standard/payload.rs | 2 +- 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/crates/op-rbuilder/src/builders/context.rs b/crates/op-rbuilder/src/builders/context.rs index 7bba2bba..fa15131c 100644 --- a/crates/op-rbuilder/src/builders/context.rs +++ b/crates/op-rbuilder/src/builders/context.rs @@ -1,7 +1,7 @@ -use alloy_eips::{Encodable2718, Typed2718}; use alloy_consensus::{ Eip658Value, Transaction, conditional::BlockConditionalAttributes, transaction::TxHashRef, }; +use alloy_eips::{Encodable2718, Typed2718}; use alloy_evm::Database; use alloy_op_evm::block::receipt_builder::OpReceiptBuilder; use alloy_primitives::{BlockHash, Bytes, U256}; @@ -377,11 +377,10 @@ impl OpPayloadBuilderCtx { Ok(info) } - /// Simulates the given best transactions. - /// The simulation updates the execution info and commit changes to the db + /// Executes the given best transactions and updates the execution info. /// /// Returns `Ok(Some(())` if the job was cancelled. - pub(super) fn simulate_best_transactions( + pub(super) fn execute_best_transactions( &self, info: &mut ExecutionInfo, db: &mut State, diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 632adf24..15e45ab6 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -893,8 +893,9 @@ where block_cancel: &CancellationToken, target_gas_for_batch: u64, target_da_for_batch: Option, + target_da_footprint_for_batch: Option, ) -> eyre::Result> { - // create simulation info and simulation state with same cache and transition state as current state + // create simulation info and simulation state with the same cache and transition state as the current state let mut simulation_info = info.clone(); let simulation_cache = state.cache.clone(); let simulation_transition_state = state.transition_state.clone(); @@ -930,6 +931,7 @@ where best_txs, target_gas_for_batch.min(ctx.block_gas_limit()), target_da_for_batch, + target_da_footprint_for_batch, ) .wrap_err("failed to execute best transactions")?; let payload_transaction_simulation_time = tx_execution_start_time.elapsed(); @@ -1026,6 +1028,8 @@ where let flashblock_index = ctx.flashblock_index(); let mut target_gas_for_batch = ctx.extra_ctx.target_gas_for_batch; let mut target_da_for_batch = ctx.extra_ctx.target_da_for_batch; + let mut target_da_footprint_for_batch = ctx.extra_ctx.target_da_footprint_for_batch; + info!( target: "payload_builder", block_number = ctx.block_number(), @@ -1035,6 +1039,7 @@ where target_da = target_da_for_batch, da_used = info.cumulative_da_bytes_used, block_gas_used = ctx.block_gas_limit(), + target_da_footprint = target_da_footprint_for_batch, "Building flashblock", ); let flashblock_build_start_time = Instant::now(); @@ -1048,27 +1053,43 @@ where vec![] }); - let builder_tx_gas = builder_txs.iter().map(|t| t.gas_used).sum(); - let builder_tx_da_size = builder_txs.iter().map(|t| t.da_size).sum(); + // only reserve builder tx gas / da size that has not been committed yet + // committed builder txs would have counted towards the gas / da used + let builder_tx_gas = builder_txs + .iter() + .filter(|tx| !tx.is_top_of_block) + .fold(0, |acc, tx| acc + tx.gas_used); + let builder_tx_da_size: u64 = builder_txs + .iter() + .filter(|tx| !tx.is_top_of_block) + .fold(0, |acc, tx| acc + tx.da_size); target_gas_for_batch = target_gas_for_batch.saturating_sub(builder_tx_gas); + // saturating sub just in case, we will log an error if da_limit too small for builder_tx_da_size if let Some(da_limit) = target_da_for_batch.as_mut() { *da_limit = da_limit.saturating_sub(builder_tx_da_size); } + if let (Some(footprint), Some(scalar)) = ( + target_da_footprint_for_batch.as_mut(), + info.da_footprint_scalar, + ) { + *footprint = footprint.saturating_sub(builder_tx_da_size.saturating_mul(scalar as u64)); + } + let mut best: Option = None; // 2. --- Build candidates and update best --- loop { - // If main token got canceled in here that means we received get_payload, and we should drop everything and not update best_payload - // To ensure that we will return same blocks as rollup-boost (to leverage caches) + // If the main token got canceled in here, that means we received get_payload, and we should drop everything and not update best_payload + // To ensure that we will return the same blocks as rollup-boost (to leverage caches) if block_cancel.is_cancelled() { self.record_flashblocks_metrics( ctx, info, ctx.target_flashblock_count(), span, - "Payload building complete, channel closed or job cancelled", + "Payload building complete, channel closed or job canceled", ); return Ok(None); } @@ -1088,6 +1109,7 @@ where block_cancel, target_gas_for_batch, target_da_for_batch, + target_da_footprint_for_batch, )?; } @@ -1139,7 +1161,7 @@ where .flashblock_num_tx_histogram .record(info.executed_transactions.len() as f64); - // Update context for next iteration + // Update context for the next iteration let target_gas_for_batch = ctx.extra_ctx.target_gas_for_batch + ctx.extra_ctx.gas_per_batch; let target_da_for_batch = ctx .extra_ctx @@ -1147,10 +1169,18 @@ where .zip(ctx.extra_ctx.target_da_for_batch) .map(|(da_limit, da)| da + da_limit); - let next_extra_ctx = ctx - .extra_ctx - .clone() - .next(target_gas_for_batch, target_da_for_batch); + if let (Some(footprint), Some(da_footprint_limit)) = ( + target_da_footprint_for_batch.as_mut(), + ctx.extra_ctx.da_footprint_per_batch, + ) { + *footprint += da_footprint_limit; + } + + let next_extra_ctx = ctx.extra_ctx.clone().next( + target_gas_for_batch, + target_da_for_batch, + target_da_footprint_for_batch, + ); info!( target: "payload_builder", diff --git a/crates/op-rbuilder/src/builders/standard/payload.rs b/crates/op-rbuilder/src/builders/standard/payload.rs index 45457042..f8bb1b58 100644 --- a/crates/op-rbuilder/src/builders/standard/payload.rs +++ b/crates/op-rbuilder/src/builders/standard/payload.rs @@ -407,7 +407,7 @@ impl OpBuilder<'_, Txs> { .set(transaction_pool_fetch_time); if ctx - .simulate_best_transactions( + .execute_best_transactions( &mut info, db, &mut best_txs,