Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
alloy-primitives = { version = "0.8", features = ["serde"] }
anyhow = "1.0.100"
hyper = { version = "1.5", features = ["server", "http1"] }
hyper-util = { version = "0.1", features = ["tokio"] }
http-body-util = "0.1"

[dependencies.monad-exec-events]
git = "https://github.com/category-labs/monad-bft"
Expand Down
3 changes: 2 additions & 1 deletion backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ RUN cargo build --release --bin backend

# Expose WebSocket port
EXPOSE 8443
EXPOSE 443

# Set entrypoint with default server address for container
ENTRYPOINT ["cargo", "run", "--release", "--bin", "backend", "--"]
CMD ["--server-addr", "0.0.0.0:8443"]
CMD ["--server-addr", "0.0.0.0:8443", "--health-server-addr", "0.0.0.0:443"]
3 changes: 2 additions & 1 deletion backend/Dockerfile_build_and_publish
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ COPY --from=builder /usr/src/app/restricted_filters.json /app/restricted_filters

# Expose WebSocket port
EXPOSE 8443
EXPOSE 443

# Set entrypoint using absolute path or relative to WORKDIR
ENTRYPOINT ["./backend"]
CMD ["--server-addr", "0.0.0.0:8443"]
CMD ["--server-addr", "0.0.0.0:8443", "--health-server-addr", "0.0.0.0:443"]
9 changes: 7 additions & 2 deletions backend/src/bin/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub struct Cli {

#[arg(short, long, default_value = "127.0.0.1:3000")]
server_addr: String,

#[arg(long, default_value = "127.0.0.1:4000")]
health_server_addr: String,
}

#[tokio::main]
Expand All @@ -33,6 +36,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let Cli {
event_ring_path,
server_addr,
health_server_addr,
} = Cli::parse();

// Resolve the event ring path
Expand All @@ -47,12 +51,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Spawn the event listener thread
let listener_handle = event_listener::run_event_listener(event_ring_path, event_sender);

// Parse server address
// Parse server addresses
let addr: SocketAddr = server_addr.parse()?;
let health_addr: SocketAddr = health_server_addr.parse()?;

// Run both tasks and exit when either completes
tokio::select! {
result = server::run_websocket_server(addr, event_receiver) => {
result = server::run_servers(addr, health_addr, event_receiver) => {
warn!("WebSocket server stopped: {:?}", result);
}
_ = tokio::task::spawn_blocking(move || listener_handle.join()) => {
Expand Down
119 changes: 107 additions & 12 deletions backend/src/lib/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use alloy_primitives::{Address, B256};
use http_body_util::Full;
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response, StatusCode};
use futures_util::stream::SplitSink;
use futures_util::{stream::SplitStream, SinkExt, StreamExt};
use monad_exec_events::ExecEvent;
Expand All @@ -18,6 +26,9 @@ use super::event_filter::EventFilter;
use super::event_listener::EventData;
use super::serializable_event::SerializableEventData;

/// Stores the Unix timestamp (in seconds) of the last event received from the ring
type LastEventTime = Arc<AtomicU64>;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopAccessesData {
pub account: Vec<AccessEntry<Address>>,
Expand Down Expand Up @@ -188,6 +199,7 @@ async fn client_read_task(
async fn run_event_forwarder_task(
mut event_receiver: tokio::sync::mpsc::Receiver<EventData>,
event_broadcast_sender: broadcast::Sender<EventDataOrMetrics>,
last_event_time: LastEventTime,
) {
let mut account_accesses = TopKTracker::new(1_000);
let mut storage_accesses = TopKTracker::new(1_000);
Expand All @@ -207,6 +219,13 @@ async fn run_event_forwarder_task(
}
let mut event_data = event_data.unwrap();

// Update last event timestamp for health check
let now_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
last_event_time.store(now_secs, Ordering::Relaxed);

// Track txn_hash from TxnHeaderStart events
if let EventName::TxnHeaderStart = event_data.event_name {
if let ExecEvent::TxnHeaderStart { txn_index, txn_header_start, .. } = &event_data.payload {
Expand Down Expand Up @@ -341,20 +360,60 @@ async fn handle_connection(
info!("WebSocket connection closed: {}", addr);
}

pub async fn run_websocket_server(
server_addr: SocketAddr,
event_receiver: tokio::sync::mpsc::Receiver<EventData>,
) -> Result<(), Box<dyn std::error::Error>> {
// Create a broadcast channel for distributing events to all clients
let (event_broadcast_sender, _) = broadcast::channel::<EventDataOrMetrics>(1_000_000);
async fn health_handler(
last_event_time: LastEventTime,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
let now_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let last_event = last_event_time.load(Ordering::Relaxed);
let is_healthy = now_secs.saturating_sub(last_event) <= 10;

let body = if is_healthy {
info!("Health check passed");
r#"{"success": true}"#
} else {
warn!("Health check failed - last event time: {} seconds ago", now_secs.saturating_sub(last_event));
r#"{"success": false}"#
};

// Spawn a task to forward events from the mpsc channel to the broadcast channel
let event_broadcast_sender_clone = event_broadcast_sender.clone();
let _ = tokio::spawn(run_event_forwarder_task(
event_receiver,
event_broadcast_sender_clone,
));
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Full::new(Bytes::from(body)))
.unwrap())
}

async fn run_health_server(
health_addr: SocketAddr,
last_event_time: LastEventTime,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let listener = tokio::net::TcpListener::bind(health_addr).await?;
info!("Health server listening on: {}", health_addr);

loop {
let (stream, _) = listener.accept().await?;
let io = hyper_util::rt::TokioIo::new(stream);
let last_event_time = last_event_time.clone();

tokio::spawn(async move {
let service = service_fn(move |_req: Request<hyper::body::Incoming>| {
let last_event_time = last_event_time.clone();
async move { health_handler(last_event_time).await }
});

if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
error!("Health server connection error: {}", e);
}
});
}
}

async fn run_websocket_server(
server_addr: SocketAddr,
event_broadcast_sender: broadcast::Sender<EventDataOrMetrics>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Bind the TCP listener
let listener = TcpListener::bind(&server_addr).await?;
info!("WebSocket server listening on: {}", server_addr);
Expand All @@ -380,3 +439,39 @@ pub async fn run_websocket_server(
}
}
}

pub async fn run_servers(
server_addr: SocketAddr,
health_server_addr: SocketAddr,
event_receiver: tokio::sync::mpsc::Receiver<EventData>,
) -> Result<(), Box<dyn std::error::Error>> {
// Create shared state for tracking last event time (for health checks)
let last_event_time: LastEventTime = Arc::new(AtomicU64::new(0));

// Create a broadcast channel for distributing events to all clients
let (event_broadcast_sender, _) = broadcast::channel::<EventDataOrMetrics>(1_000_000);

// Spawn a task to forward events from the mpsc channel to the broadcast channel
let event_broadcast_sender_clone = event_broadcast_sender.clone();
let last_event_time_clone = last_event_time.clone();
tokio::spawn(run_event_forwarder_task(
event_receiver,
event_broadcast_sender_clone,
last_event_time_clone,
));

// Spawn both servers and wait for either to complete
let websocket_task = tokio::spawn(run_websocket_server(server_addr, event_broadcast_sender));
let health_task = tokio::spawn(run_health_server(health_server_addr, last_event_time));

tokio::select! {
result = websocket_task => {
error!("WebSocket server task stopped: {:?}", result);
}
result = health_task => {
error!("Health server task stopped: {:?}", result);
}
}

Ok(())
}
8 changes: 7 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@ services:
# Optional: Persistent logs volume
- logs:/var/log/eventwatch
restart: unless-stopped
command: ["--server-addr", "0.0.0.0:8443"]
command: ["--server-addr", "0.0.0.0:8443", "--health-server-addr", "0.0.0.0:443"]
healthcheck:
test: ["CMD-SHELL", "curl -sf http://0.0.0.0:443/health | grep -q '\"success\": true'"]
interval: 10s
timeout: 5s
retries: 3
start_period: 10s
logging:
driver: json-file
options:
Expand Down