Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 237 additions & 1 deletion crates/tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,18 @@ impl Actor for SenderAccount {
let _ = UNAGGREGATED_FEES
.remove_label_values(&[&state.sender.to_string(), &allocation_id.to_string()]);

let version = match state.sender_type {
crate::agent::sender_accounts_manager::SenderType::Legacy => TAP_V1,
crate::agent::sender_accounts_manager::SenderType::Horizon => TAP_V2,
};
let _ = UNAGGREGATED_FEES_BY_VERSION.remove_label_values(&[
&state.sender.to_string(),
&allocation_id.to_string(),
version,
]);
let _ = INVALID_RECEIPT_FEES
.remove_label_values(&[&state.sender.to_string(), &allocation_id.to_string()]);

// Check for deny conditions - look up correct allocation variant from state
let allocation_enum = state
.allocation_ids
Expand Down Expand Up @@ -1770,6 +1782,15 @@ impl Actor for SenderAccount {
if let Some(handle) = state.reconciliation_handle.take() {
handle.abort();
}

// Clean up sender-level metrics to avoid stale gauge values
let sender_label = state.sender.to_string();
let _ = SENDER_DENIED.remove_label_values(&[&sender_label]);
let _ = ESCROW_BALANCE.remove_label_values(&[&sender_label]);
let _ = SENDER_FEE_TRACKER.remove_label_values(&[&sender_label]);
let _ = MAX_FEE_PER_SENDER.remove_label_values(&[&sender_label]);
let _ = RAV_REQUEST_TRIGGER_VALUE.remove_label_values(&[&sender_label]);

Ok(())
}
}
Expand Down Expand Up @@ -1835,7 +1856,11 @@ pub mod tests {
Mock, MockServer, ResponseTemplate,
};

use super::{RavInformation, SenderAccountMessage, ALLOCATION_RECONCILIATION_RUNS};
use super::{
RavInformation, SenderAccountMessage, ALLOCATION_RECONCILIATION_RUNS, ESCROW_BALANCE,
INVALID_RECEIPT_FEES, MAX_FEE_PER_SENDER, RAV_REQUEST_TRIGGER_VALUE, SENDER_DENIED,
SENDER_FEE_TRACKER, TAP_V1, UNAGGREGATED_FEES_BY_VERSION,
};
use crate::{
agent::{
sender_account::ReceiptFees, sender_accounts_manager::AllocationId,
Expand Down Expand Up @@ -3100,4 +3125,215 @@ pub mod tests {

sender_account.stop_and_wait(None, None).await.unwrap();
}

/// Test that UNAGGREGATED_FEES_BY_VERSION metric is cleaned up when allocation stops
///
/// This test verifies the fix for stale gauge metrics that were introduced in the
/// Horizon V2 TAP support commit. Previously, UNAGGREGATED_FEES_BY_VERSION was set
/// but never cleaned up when allocations closed, leaving stale values in Prometheus.
#[tokio::test]
async fn test_unaggregated_fees_by_version_cleanup_on_allocation_stop() {
// Use a unique allocation ID for this test to avoid interference from other tests
// (prometheus metrics are global/shared)
let unique_allocation = test_assets::ALLOCATION_ID_1;

let test_db = test_assets::setup_shared_test_db().await;
let pgpool = test_db.pool;

let (sender_account, mut msg_receiver, prefix, _, _) =
create_sender_account().pgpool(pgpool).call().await;

// Create a mock sender allocation and link it to the sender account
let (mock_sender_allocation, _, next_unaggregated_fees) =
MockSenderAllocation::new_with_triggered_rav_request(sender_account.clone());

let name = format!("{}:{}:{}", prefix, SENDER.1, unique_allocation);
let (allocation, _) = MockSenderAllocation::spawn_linked(
Some(name),
mock_sender_allocation,
(),
sender_account.get_cell(),
)
.await
.unwrap();

// Send unaggregated fees to trigger metric set
next_unaggregated_fees.send(1000).unwrap();

// Directly set the metric to simulate the value being recorded
// (We do this because the actual message flow is complex and depends on
// allocation state being properly set up)
let sender_label = SENDER.1.to_string();
let allocation_label = unique_allocation.to_string();
UNAGGREGATED_FEES_BY_VERSION
.with_label_values(&[&sender_label, &allocation_label, TAP_V1])
.set(1000.0);

// Verify metric was set
let metric_value = UNAGGREGATED_FEES_BY_VERSION
.get_metric_with_label_values(&[&sender_label, &allocation_label, TAP_V1])
.expect("Metric should exist after being set")
.get();
assert_eq!(
metric_value, 1000.0,
"Metric should have value 1000.0 after set, got {metric_value}"
);

// Stop the allocation - this should trigger ActorTerminated supervision event
// which in turn should clean up the metric
allocation.stop_and_wait(None, None).await.unwrap();

// Give time for supervision event to be processed
flush_messages(&mut msg_receiver).await;
tokio::time::sleep(Duration::from_millis(100)).await;

// Verify metric was cleaned up. After remove_label_values, get_metric_with_label_values
// creates a NEW metric with default value 0. So the value changing from 1000 to 0
// proves the old metric was removed.
// See: https://docs.rs/prometheus/latest/prometheus/core/struct.MetricVec.html
let metric_value_after = UNAGGREGATED_FEES_BY_VERSION
.with_label_values(&[&sender_label, &allocation_label, TAP_V1])
.get();
assert_eq!(
metric_value_after, 0.0,
"Metric should be 0 after removal (old value was 1000), got {metric_value_after}"
);

sender_account.stop_and_wait(None, None).await.unwrap();
}

/// Test that INVALID_RECEIPT_FEES metric is cleaned up when allocation stops
#[tokio::test]
async fn test_invalid_receipt_fees_cleanup_on_allocation_stop() {
let unique_allocation = test_assets::ALLOCATION_ID_1;

let test_db = test_assets::setup_shared_test_db().await;
let pgpool = test_db.pool;

let (sender_account, mut msg_receiver, prefix, _, _) =
create_sender_account().pgpool(pgpool).call().await;

let (mock_sender_allocation, _, next_unaggregated_fees) =
MockSenderAllocation::new_with_triggered_rav_request(sender_account.clone());

let name = format!("{}:{}:{}", prefix, SENDER.1, unique_allocation);
let (allocation, _) = MockSenderAllocation::spawn_linked(
Some(name),
mock_sender_allocation,
(),
sender_account.get_cell(),
)
.await
.unwrap();

next_unaggregated_fees.send(1000).unwrap();

let sender_label = SENDER.1.to_string();
let allocation_label = unique_allocation.to_string();
INVALID_RECEIPT_FEES
.with_label_values(&[&sender_label, &allocation_label])
.set(500.0);

let metric_value = INVALID_RECEIPT_FEES
.get_metric_with_label_values(&[&sender_label, &allocation_label])
.expect("Metric should exist after being set")
.get();
assert_eq!(
metric_value, 500.0,
"Metric should have value 500.0 after set, got {metric_value}"
);

allocation.stop_and_wait(None, None).await.unwrap();

flush_messages(&mut msg_receiver).await;
tokio::time::sleep(Duration::from_millis(100)).await;

let metric_value_after = INVALID_RECEIPT_FEES
.with_label_values(&[&sender_label, &allocation_label])
.get();
assert_eq!(
metric_value_after, 0.0,
"Metric should be 0 after removal (old value was 500), got {metric_value_after}"
);

sender_account.stop_and_wait(None, None).await.unwrap();
}

/// Test that sender-level metrics are cleaned up when SenderAccount stops
#[tokio::test]
async fn test_sender_level_gauges_cleanup_on_post_stop() {
let test_db = test_assets::setup_shared_test_db().await;
let pgpool = test_db.pool;

let (sender_account, mut msg_receiver, _, _, _) =
create_sender_account().pgpool(pgpool).call().await;

flush_messages(&mut msg_receiver).await;

let sender_label = SENDER.1.to_string();

// Set all sender-level metrics to non-zero values
SENDER_DENIED.with_label_values(&[&sender_label]).set(1);
ESCROW_BALANCE
.with_label_values(&[&sender_label])
.set(1000.0);
SENDER_FEE_TRACKER
.with_label_values(&[&sender_label])
.set(500.0);
MAX_FEE_PER_SENDER
.with_label_values(&[&sender_label])
.set(2000.0);
RAV_REQUEST_TRIGGER_VALUE
.with_label_values(&[&sender_label])
.set(100.0);

// Verify metrics were set
assert_eq!(
SENDER_DENIED
.get_metric_with_label_values(&[&sender_label])
.unwrap()
.get(),
1
);
assert_eq!(
ESCROW_BALANCE
.get_metric_with_label_values(&[&sender_label])
.unwrap()
.get(),
1000.0
);

// Stop sender account - this triggers post_stop which should clean up metrics
sender_account.stop_and_wait(None, None).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;

// Verify all sender-level metrics were cleaned up
assert_eq!(
SENDER_DENIED.with_label_values(&[&sender_label]).get(),
0,
"SENDER_DENIED should be 0 after cleanup"
);
assert_eq!(
ESCROW_BALANCE.with_label_values(&[&sender_label]).get(),
0.0,
"ESCROW_BALANCE should be 0 after cleanup"
);
assert_eq!(
SENDER_FEE_TRACKER.with_label_values(&[&sender_label]).get(),
0.0,
"SENDER_FEE_TRACKER should be 0 after cleanup"
);
assert_eq!(
MAX_FEE_PER_SENDER.with_label_values(&[&sender_label]).get(),
0.0,
"MAX_FEE_PER_SENDER should be 0 after cleanup"
);
assert_eq!(
RAV_REQUEST_TRIGGER_VALUE
.with_label_values(&[&sender_label])
.get(),
0.0,
"RAV_REQUEST_TRIGGER_VALUE should be 0 after cleanup"
);
}
}
Loading