From 6a919e82cb706ddf308eb317fb466ec747b2a37d Mon Sep 17 00:00:00 2001 From: 0xflashboy Date: Fri, 23 Jan 2026 18:11:13 +0000 Subject: [PATCH 1/3] Add health check API --- Cargo.lock | 83 ++++++++++++++++++ backend/Cargo.toml | 3 + backend/Dockerfile | 3 +- backend/Dockerfile_build_and_publish | 3 +- backend/src/bin/backend.rs | 9 +- backend/src/lib/server.rs | 122 ++++++++++++++++++++++++--- docker-compose.yml | 8 +- 7 files changed, 214 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f6532ba..cd1ff7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -308,6 +308,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "auto_impl" version = "1.3.0" @@ -787,6 +793,9 @@ dependencies = [ "chrono", "clap", "futures-util", + "http-body-util", + "hyper", + "hyper-util", "lazy_static", "monad-event-ring", "monad-exec-events", @@ -872,6 +881,15 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", +] + [[package]] name = "futures-core" version = "0.3.31" @@ -1014,12 +1032,77 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "hyper" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "pin-utils", + "smallvec", + "tokio", +] + +[[package]] +name = "hyper-util" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "hyper", + "pin-project-lite", + "tokio", +] + [[package]] name = "iana-time-zone" version = "0.1.64" diff --git a/backend/Cargo.toml b/backend/Cargo.toml index ff76e4f..c1b5732 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -16,6 +16,9 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } alloy-primitives = { version = "0.8", features = ["serde"] } anyhow = "1.0.100" +hyper = { version = "1.5", features = ["server", "http1"] } +hyper-util = { version = "0.1", features = ["tokio"] } +http-body-util = "0.1" [dependencies.monad-exec-events] git = "https://github.com/category-labs/monad-bft" diff --git a/backend/Dockerfile b/backend/Dockerfile index 114a47e..6d7e59a 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -25,7 +25,8 @@ RUN cargo build --release --bin backend # Expose WebSocket port EXPOSE 8443 +EXPOSE 443 # Set entrypoint with default server address for container ENTRYPOINT ["cargo", "run", "--release", "--bin", "backend", "--"] -CMD ["--server-addr", "0.0.0.0:8443"] +CMD ["--server-addr", "0.0.0.0:8443", "--health-server-addr", "0.0.0.0:443"] diff --git a/backend/Dockerfile_build_and_publish b/backend/Dockerfile_build_and_publish index 93a95f0..ac157fa 100644 --- a/backend/Dockerfile_build_and_publish +++ b/backend/Dockerfile_build_and_publish @@ -46,7 +46,8 @@ COPY --from=builder /usr/src/app/restricted_filters.json /app/restricted_filters # Expose WebSocket port EXPOSE 8443 +EXPOSE 443 # Set entrypoint using absolute path or relative to WORKDIR ENTRYPOINT ["./backend"] -CMD ["--server-addr", "0.0.0.0:8443"] +CMD ["--server-addr", "0.0.0.0:8443", "--health-server-addr", "0.0.0.0:443"] diff --git a/backend/src/bin/backend.rs b/backend/src/bin/backend.rs index 5b3b18c..48b0047 100644 --- a/backend/src/bin/backend.rs +++ b/backend/src/bin/backend.rs @@ -18,6 +18,9 @@ pub struct Cli { #[arg(short, long, default_value = "127.0.0.1:3000")] server_addr: String, + + #[arg(long, default_value = "127.0.0.1:4000")] + health_server_addr: String, } #[tokio::main] @@ -33,6 +36,7 @@ async fn main() -> Result<(), Box> { let Cli { event_ring_path, server_addr, + health_server_addr, } = Cli::parse(); // Resolve the event ring path @@ -47,12 +51,13 @@ async fn main() -> Result<(), Box> { // Spawn the event listener thread let listener_handle = event_listener::run_event_listener(event_ring_path, event_sender); - // Parse server address + // Parse server addresses let addr: SocketAddr = server_addr.parse()?; + let health_addr: SocketAddr = health_server_addr.parse()?; // Run both tasks and exit when either completes tokio::select! { - result = server::run_websocket_server(addr, event_receiver) => { + result = server::run_servers(addr, health_addr, event_receiver) => { warn!("WebSocket server stopped: {:?}", result); } _ = tokio::task::spawn_blocking(move || listener_handle.join()) => { diff --git a/backend/src/lib/server.rs b/backend/src/lib/server.rs index d126f14..5c8f9cd 100644 --- a/backend/src/lib/server.rs +++ b/backend/src/lib/server.rs @@ -1,6 +1,14 @@ use std::net::SocketAddr; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use alloy_primitives::{Address, B256}; +use http_body_util::Full; +use hyper::body::Bytes; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response, StatusCode}; use futures_util::stream::SplitSink; use futures_util::{stream::SplitStream, SinkExt, StreamExt}; use monad_exec_events::ExecEvent; @@ -18,6 +26,9 @@ use super::event_filter::EventFilter; use super::event_listener::EventData; use super::serializable_event::SerializableEventData; +/// Stores the Unix timestamp (in seconds) of the last event received from the ring +type LastEventTime = Arc; + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TopAccessesData { pub account: Vec>, @@ -188,6 +199,7 @@ async fn client_read_task( async fn run_event_forwarder_task( mut event_receiver: tokio::sync::mpsc::Receiver, event_broadcast_sender: broadcast::Sender, + last_event_time: LastEventTime, ) { let mut account_accesses = TopKTracker::new(1_000); let mut storage_accesses = TopKTracker::new(1_000); @@ -198,6 +210,7 @@ async fn run_event_forwarder_task( let mut tps_tracker = TPSTracker::new(); + let kill_time = std::time::Instant::now() + std::time::Duration::from_secs(10); loop { tokio::select! { event_data = event_receiver.recv() => { @@ -207,6 +220,17 @@ async fn run_event_forwarder_task( } let mut event_data = event_data.unwrap(); + // Update last event timestamp for health check + let now_secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + if std::time::Instant::now() > kill_time { + error!("Killing event forwarder time update"); + continue; + } + last_event_time.store(now_secs, Ordering::Relaxed); + // Track txn_hash from TxnHeaderStart events if let EventName::TxnHeaderStart = event_data.event_name { if let ExecEvent::TxnHeaderStart { txn_index, txn_header_start, .. } = &event_data.payload { @@ -341,20 +365,58 @@ async fn handle_connection( info!("WebSocket connection closed: {}", addr); } -pub async fn run_websocket_server( - server_addr: SocketAddr, - event_receiver: tokio::sync::mpsc::Receiver, -) -> Result<(), Box> { - // Create a broadcast channel for distributing events to all clients - let (event_broadcast_sender, _) = broadcast::channel::(1_000_000); +async fn health_handler( + last_event_time: LastEventTime, +) -> Result>, hyper::Error> { + let now_secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let last_event = last_event_time.load(Ordering::Relaxed); + let is_healthy = now_secs.saturating_sub(last_event) <= 10; + + let body = if is_healthy { + r#"{"success": true}"# + } else { + r#"{"success": false}"# + }; - // Spawn a task to forward events from the mpsc channel to the broadcast channel - let event_broadcast_sender_clone = event_broadcast_sender.clone(); - let _ = tokio::spawn(run_event_forwarder_task( - event_receiver, - event_broadcast_sender_clone, - )); + Ok(Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Full::new(Bytes::from(body))) + .unwrap()) +} + +async fn run_health_server( + health_addr: SocketAddr, + last_event_time: LastEventTime, +) -> Result<(), Box> { + let listener = tokio::net::TcpListener::bind(health_addr).await?; + info!("Health server listening on: {}", health_addr); + loop { + let (stream, _) = listener.accept().await?; + let io = hyper_util::rt::TokioIo::new(stream); + let last_event_time = last_event_time.clone(); + + tokio::spawn(async move { + let service = service_fn(move |_req: Request| { + let last_event_time = last_event_time.clone(); + async move { health_handler(last_event_time).await } + }); + + if let Err(e) = http1::Builder::new().serve_connection(io, service).await { + error!("Health server connection error: {}", e); + } + }); + } +} + +async fn run_websocket_server( + server_addr: SocketAddr, + event_broadcast_sender: broadcast::Sender, +) -> Result<(), Box> { // Bind the TCP listener let listener = TcpListener::bind(&server_addr).await?; info!("WebSocket server listening on: {}", server_addr); @@ -380,3 +442,39 @@ pub async fn run_websocket_server( } } } + +pub async fn run_servers( + server_addr: SocketAddr, + health_server_addr: SocketAddr, + event_receiver: tokio::sync::mpsc::Receiver, +) -> Result<(), Box> { + // Create shared state for tracking last event time (for health checks) + let last_event_time: LastEventTime = Arc::new(AtomicU64::new(0)); + + // Create a broadcast channel for distributing events to all clients + let (event_broadcast_sender, _) = broadcast::channel::(1_000_000); + + // Spawn a task to forward events from the mpsc channel to the broadcast channel + let event_broadcast_sender_clone = event_broadcast_sender.clone(); + let last_event_time_clone = last_event_time.clone(); + tokio::spawn(run_event_forwarder_task( + event_receiver, + event_broadcast_sender_clone, + last_event_time_clone, + )); + + // Spawn both servers and wait for either to complete + let websocket_task = tokio::spawn(run_websocket_server(server_addr, event_broadcast_sender)); + let health_task = tokio::spawn(run_health_server(health_server_addr, last_event_time)); + + tokio::select! { + result = websocket_task => { + error!("WebSocket server task stopped: {:?}", result); + } + result = health_task => { + error!("Health server task stopped: {:?}", result); + } + } + + Ok(()) +} diff --git a/docker-compose.yml b/docker-compose.yml index 66cb1f1..e75cd0a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,7 +16,13 @@ services: # Optional: Persistent logs volume - logs:/var/log/eventwatch restart: unless-stopped - command: ["--server-addr", "0.0.0.0:8443"] + command: ["--server-addr", "0.0.0.0:8443", "--health-server-addr", "0.0.0.0:443"] + healthcheck: + test: ["CMD-SHELL", "curl -sf http://0.0.0.0:443/health | grep -q '\"success\": true'"] + interval: 10s + timeout: 5s + retries: 3 + start_period: 10s logging: driver: json-file options: From a74ab92557b8cdfd5c1f5acd1516c31a7160e0a2 Mon Sep 17 00:00:00 2001 From: 0xflashboy Date: Fri, 23 Jan 2026 18:12:41 +0000 Subject: [PATCH 2/3] remove kill time --- backend/src/lib/server.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/backend/src/lib/server.rs b/backend/src/lib/server.rs index 5c8f9cd..6e54da1 100644 --- a/backend/src/lib/server.rs +++ b/backend/src/lib/server.rs @@ -210,7 +210,6 @@ async fn run_event_forwarder_task( let mut tps_tracker = TPSTracker::new(); - let kill_time = std::time::Instant::now() + std::time::Duration::from_secs(10); loop { tokio::select! { event_data = event_receiver.recv() => { @@ -225,10 +224,6 @@ async fn run_event_forwarder_task( .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); - if std::time::Instant::now() > kill_time { - error!("Killing event forwarder time update"); - continue; - } last_event_time.store(now_secs, Ordering::Relaxed); // Track txn_hash from TxnHeaderStart events From 340a905ee4125e6f7c8a37a2dea34a0c07571596 Mon Sep 17 00:00:00 2001 From: 0xflashboy Date: Fri, 23 Jan 2026 18:14:27 +0000 Subject: [PATCH 3/3] log health check requests --- backend/src/lib/server.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/src/lib/server.rs b/backend/src/lib/server.rs index 6e54da1..8aa9935 100644 --- a/backend/src/lib/server.rs +++ b/backend/src/lib/server.rs @@ -371,8 +371,10 @@ async fn health_handler( let is_healthy = now_secs.saturating_sub(last_event) <= 10; let body = if is_healthy { + info!("Health check passed"); r#"{"success": true}"# } else { + warn!("Health check failed - last event time: {} seconds ago", now_secs.saturating_sub(last_event)); r#"{"success": false}"# };