Skip to content

Commit 65cb426

Browse files
0x00101010avalonche
authored andcommitted
Update to use op-alloy flashblock types
1 parent bec1d42 commit 65cb426

File tree

5 files changed

+43
-115
lines changed

5 files changed

+43
-115
lines changed

Justfile

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,13 @@ build:
44

55
# Build docker image (debug)
66
build-debug:
7-
docker buildx build --build-arg RELEASE=false -t flashbots/rollup-boost:develop .
7+
docker buildx build --build-arg RELEASE=false -t flashbots/rollup-boost:develop .
8+
9+
clippy:
10+
cargo clippy --workspace -- -D warnings
11+
12+
fmt:
13+
cargo fmt --all
14+
15+
test:
16+
cargo nextest run --workspace

crates/rollup-boost-types/src/flashblocks.rs

Lines changed: 0 additions & 82 deletions
This file was deleted.

crates/rollup-boost/src/flashblocks/inbound.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use futures::{SinkExt, StreamExt};
77
use lru::LruCache;
88
use rollup_boost_types::flashblocks::FlashblocksPayloadV1;
99
use std::io::ErrorKind::TimedOut;
10+
use op_alloy_rpc_types_engine::OpFlashblockPayload;
1011
use std::num::NonZeroUsize;
1112
use std::sync::Arc;
1213
use std::sync::Mutex;
@@ -44,23 +45,23 @@ enum FlashblocksReceiverError {
4445
TaskPanic(String),
4546

4647
#[error("Failed to send message to sender: {0}")]
47-
SendError(#[from] Box<tokio::sync::mpsc::error::SendError<FlashblocksPayloadV1>>),
48+
SendError(#[from] Box<tokio::sync::mpsc::error::SendError<OpFlashblockPayload>>),
4849

4950
#[error("Ping mutex poisoned")]
5051
MutexPoisoned,
5152
}
5253

5354
pub struct FlashblocksReceiverService {
5455
url: Url,
55-
sender: mpsc::Sender<FlashblocksPayloadV1>,
56+
sender: mpsc::Sender<OpFlashblockPayload>,
5657
websocket_config: FlashblocksWebsocketConfig,
5758
metrics: FlashblocksWsInboundMetrics,
5859
}
5960

6061
impl FlashblocksReceiverService {
6162
pub fn new(
6263
url: Url,
63-
sender: mpsc::Sender<FlashblocksPayloadV1>,
64+
sender: mpsc::Sender<OpFlashblockPayload>,
6465
websocket_config: FlashblocksWebsocketConfig,
6566
) -> Self {
6667
Self {
@@ -167,7 +168,7 @@ impl FlashblocksReceiverService {
167168
Some(Ok(msg)) => match msg {
168169
Message::Text(text) => {
169170
metrics.messages_received.increment(1);
170-
match serde_json::from_str::<FlashblocksPayloadV1>(&text) {
171+
match serde_json::from_str::<OpFlashblockPayload>(&text) {
171172
Ok(flashblocks_msg) => sender.send(flashblocks_msg).await.map_err(|e| {
172173
FlashblocksReceiverError::SendError(Box::new(e))
173174
})?,
@@ -254,12 +255,12 @@ mod tests {
254255
addr: SocketAddr,
255256
) -> eyre::Result<(
256257
watch::Sender<bool>,
257-
mpsc::Sender<FlashblocksPayloadV1>,
258+
mpsc::Sender<OpFlashblockPayload>,
258259
mpsc::Receiver<()>,
259260
url::Url,
260261
)> {
261262
let (term_tx, mut term_rx) = watch::channel(false);
262-
let (send_tx, mut send_rx) = mpsc::channel::<FlashblocksPayloadV1>(100);
263+
let (send_tx, mut send_rx) = mpsc::channel::<OpFlashblockPayload>(100);
263264
let (send_ping_tx, send_ping_rx) = mpsc::channel::<()>(100);
264265

265266
let listener = TcpListener::bind(addr)?;
@@ -423,12 +424,12 @@ mod tests {
423424

424425
// Send a message to the websocket server
425426
send_msg
426-
.send(FlashblocksPayloadV1::default())
427+
.send(OpFlashblockPayload::default())
427428
.await
428429
.expect("message sent to websocket server");
429430

430431
let msg = rx.recv().await.expect("message received from websocket");
431-
assert_eq!(msg, FlashblocksPayloadV1::default());
432+
assert_eq!(msg, OpFlashblockPayload::default());
432433

433434
// Drop the websocket server and start another one with the same address
434435
// The FlashblocksReceiverService should reconnect to the new server
@@ -440,12 +441,12 @@ mod tests {
440441
// start a new server with the same address
441442
let (term, send_msg, _, _url) = start(addr).await?;
442443
send_msg
443-
.send(FlashblocksPayloadV1::default())
444+
.send(OpFlashblockPayload::default())
444445
.await
445446
.expect("message sent to websocket server");
446447

447448
let msg = rx.recv().await.expect("message received from websocket");
448-
assert_eq!(msg, FlashblocksPayloadV1::default());
449+
assert_eq!(msg, OpFlashblockPayload::default());
449450
term.send(true).expect("termination signal sent");
450451

451452
Ok(())

crates/rollup-boost/src/flashblocks/outbound.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use core::{
66
task::{Context, Poll},
77
};
88
use futures::{Sink, SinkExt, StreamExt};
9-
use rollup_boost_types::flashblocks::FlashblocksPayloadV1;
9+
use op_alloy_rpc_types_engine::OpFlashblockPayload;
1010
use std::{io, net::TcpListener, sync::Arc};
1111
use tokio::{
1212
net::TcpStream,
@@ -22,7 +22,7 @@ use tokio_tungstenite::{accept_async, tungstenite::Message};
2222
/// A WebSockets publisher that accepts connections from client websockets and broadcasts to them
2323
/// updates about new flashblocks. It maintains a count of sent messages and active subscriptions.
2424
///
25-
/// This is modelled as a `futures::Sink` that can be used to send `FlashblocksPayloadV1` messages.
25+
/// This is modelled as a `futures::Sink` that can be used to send `OpFlashblockPayload` messages.
2626
pub struct WebSocketPublisher {
2727
sent: Arc<AtomicUsize>,
2828
subs: Arc<AtomicUsize>,
@@ -55,7 +55,7 @@ impl WebSocketPublisher {
5555
})
5656
}
5757

58-
pub fn publish(&self, payload: &FlashblocksPayloadV1) -> io::Result<()> {
58+
pub fn publish(&self, payload: &OpFlashblockPayload) -> io::Result<()> {
5959
// Serialize the payload to a UTF-8 string
6060
// serialize only once, then just copy around only a pointer
6161
// to the serialized data for each subscription.
@@ -223,14 +223,14 @@ impl Debug for WebSocketPublisher {
223223
}
224224
}
225225

226-
impl Sink<&FlashblocksPayloadV1> for WebSocketPublisher {
226+
impl Sink<&OpFlashblockPayload> for WebSocketPublisher {
227227
type Error = eyre::Report;
228228

229229
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
230230
Poll::Ready(Ok(()))
231231
}
232232

233-
fn start_send(self: Pin<&mut Self>, item: &FlashblocksPayloadV1) -> Result<(), Self::Error> {
233+
fn start_send(self: Pin<&mut Self>, item: &OpFlashblockPayload) -> Result<(), Self::Error> {
234234
self.publish(item)?;
235235
Ok(())
236236
}

crates/rollup-boost/src/flashblocks/service.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ use op_alloy_rpc_types_engine::{
1313
OpExecutionPayloadEnvelopeV3, OpExecutionPayloadEnvelopeV4, OpExecutionPayloadV4,
1414
OpPayloadAttributes,
1515
};
16-
use rollup_boost_types::flashblocks::{
17-
ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashblocksPayloadV1,
16+
use op_alloy_rpc_types_engine::{
17+
OpFlashblockPayload, OpFlashblockPayloadBase, OpFlashblockPayloadDelta,
1818
};
19-
use rollup_boost_types::payload::{NewPayload, OpExecutionPayloadEnvelope, PayloadVersion};
19+
use reth_optimism_payload_builder::payload_id_optimism;
2020
use std::io;
2121
use std::sync::Arc;
2222
use std::sync::atomic::{AtomicU64, Ordering};
@@ -42,21 +42,21 @@ pub enum FlashblocksError {
4242
// Simplify actor messages to just handle shutdown
4343
#[derive(Debug)]
4444
enum FlashblocksEngineMessage {
45-
FlashblocksPayloadV1(FlashblocksPayloadV1),
45+
OpFlashblockPayload(OpFlashblockPayload),
4646
}
4747

4848
#[derive(Clone, Debug, Default)]
4949
pub struct FlashblockBuilder {
50-
base: Option<ExecutionPayloadBaseV1>,
51-
flashblocks: Vec<ExecutionPayloadFlashblockDeltaV1>,
50+
base: Option<OpFlashblockPayloadBase>,
51+
flashblocks: Vec<OpFlashblockPayloadDelta>,
5252
}
5353

5454
impl FlashblockBuilder {
5555
pub fn new() -> Self {
5656
Self::default()
5757
}
5858

59-
pub fn extend(&mut self, payload: FlashblocksPayloadV1) -> Result<(), FlashblocksError> {
59+
pub fn extend(&mut self, payload: OpFlashblockPayload) -> Result<(), FlashblocksError> {
6060
tracing::debug!(message = "Extending payload", payload_id = %payload.payload_id, index = payload.index, has_base=payload.base.is_some());
6161

6262
// Validate the index is contiguous
@@ -239,7 +239,7 @@ impl FlashblocksService {
239239

240240
async fn on_event(&mut self, event: FlashblocksEngineMessage) {
241241
match event {
242-
FlashblocksEngineMessage::FlashblocksPayloadV1(payload) => {
242+
FlashblocksEngineMessage::OpFlashblockPayload(payload) => {
243243
self.metrics.messages_processed.increment(1);
244244

245245
tracing::debug!(
@@ -297,9 +297,9 @@ impl FlashblocksService {
297297
}
298298
}
299299

300-
pub async fn run(&mut self, mut stream: mpsc::Receiver<FlashblocksPayloadV1>) {
300+
pub async fn run(&mut self, mut stream: mpsc::Receiver<OpFlashblockPayload>) {
301301
while let Some(event) = stream.recv().await {
302-
self.on_event(FlashblocksEngineMessage::FlashblocksPayloadV1(event))
302+
self.on_event(FlashblocksEngineMessage::OpFlashblockPayload(event))
303303
.await;
304304
}
305305
}
@@ -461,7 +461,7 @@ mod tests {
461461
let mut builder = FlashblockBuilder::new();
462462

463463
// Error: First payload must have a base
464-
let result = builder.extend(FlashblocksPayloadV1 {
464+
let result = builder.extend(OpFlashblockPayload {
465465
payload_id: PayloadId::default(),
466466
index: 0,
467467
..Default::default()
@@ -470,21 +470,21 @@ mod tests {
470470
assert_eq!(result.unwrap_err(), FlashblocksError::MissingBasePayload);
471471

472472
// Ok: First payload is correct if it has base and index 0
473-
let result = builder.extend(FlashblocksPayloadV1 {
473+
let result = builder.extend(OpFlashblockPayload {
474474
payload_id: PayloadId::default(),
475475
index: 0,
476-
base: Some(ExecutionPayloadBaseV1 {
476+
base: Some(OpFlashblockPayloadBase {
477477
..Default::default()
478478
}),
479479
..Default::default()
480480
});
481481
assert!(result.is_ok());
482482

483483
// Error: First payload must have index 0
484-
let result = builder.extend(FlashblocksPayloadV1 {
484+
let result = builder.extend(OpFlashblockPayload {
485485
payload_id: PayloadId::default(),
486486
index: 1,
487-
base: Some(ExecutionPayloadBaseV1 {
487+
base: Some(OpFlashblockPayloadBase {
488488
..Default::default()
489489
}),
490490
..Default::default()
@@ -493,7 +493,7 @@ mod tests {
493493
assert_eq!(result.unwrap_err(), FlashblocksError::UnexpectedBasePayload);
494494

495495
// Error: Second payload must have a follow-up index
496-
let result = builder.extend(FlashblocksPayloadV1 {
496+
let result = builder.extend(OpFlashblockPayload {
497497
payload_id: PayloadId::default(),
498498
index: 2,
499499
base: None,
@@ -503,7 +503,7 @@ mod tests {
503503
assert_eq!(result.unwrap_err(), FlashblocksError::InvalidIndex);
504504

505505
// Ok: Second payload has the correct index
506-
let result = builder.extend(FlashblocksPayloadV1 {
506+
let result = builder.extend(OpFlashblockPayload {
507507
payload_id: PayloadId::default(),
508508
index: 1,
509509
base: None,

0 commit comments

Comments
 (0)