Skip to content

Commit 2b8c204

Browse files
authored
Merge pull request #420 from input-output-hk/gd/fix-vrf
fix: publish epoch's active nonce as separate message
2 parents 6b33525 + c4110a1 commit 2b8c204

File tree

14 files changed

+193
-102
lines changed

14 files changed

+193
-102
lines changed

common/src/messages.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ pub enum CardanoMessage {
321321
PotDeltas(PotDeltasMessage), // Changes to pot balances
322322
BlockInfoMessage(BlockTxsMessage), // Transaction Info (total count, total output, total fees in a block)
323323
EpochActivity(EpochActivityMessage), // Total fees and VRF keys for an epoch
324+
EpochNonce(Option<Nonce>), // Epoch nonce for the current epoch
324325
DRepState(DRepStateMessage), // Active DReps at epoch end
325326
SPOState(SPOStateMessage), // Active SPOs at epoch end
326327
GovernanceProcedures(GovernanceProceduresMessage), // Governance procedures received

common/src/validation.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ use std::array::TryFromSliceError;
77

88
use thiserror::Error;
99

10-
use crate::{protocol_params::Nonce, GenesisKeyhash, PoolId, Slot, VrfKeyHash};
10+
use crate::{
11+
protocol_params::Nonce, rational_number::RationalNumber, GenesisKeyhash, PoolId, Slot,
12+
VrfKeyHash,
13+
};
1114

1215
/// Validation error
1316
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Error)]
@@ -61,7 +64,7 @@ pub enum VrfValidationError {
6164
PraosBadVrfProof(#[from] PraosBadVrfProofError),
6265
/// **Cause:** The VRF output is too large for this pool's stake.
6366
/// The pool lost the slot lottery
64-
#[error("VRF Leader Value Too Big")]
67+
#[error("{0}")]
6568
VrfLeaderValueTooBig(#[from] VrfLeaderValueTooBigError),
6669
/// **Cause:** This slot is in the overlay schedule but marked as non-active.
6770
/// It's an intentional gap slot where no blocks should be produced.
@@ -166,8 +169,12 @@ impl PartialEq for PraosBadVrfProofError {
166169
// ------------------------------------------------------------ VrfLeaderValueTooBigError
167170
#[derive(Error, Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
168171
pub enum VrfLeaderValueTooBigError {
169-
#[error("VRF Leader Value Too Big")]
170-
VrfLeaderValueTooBig,
172+
#[error("VRF Leader Value Too Big: pool_id={pool_id}, active_stake={active_stake}, relative_stake={relative_stake}")]
173+
VrfLeaderValueTooBig {
174+
pool_id: PoolId,
175+
active_stake: u64,
176+
relative_stake: RationalNumber,
177+
},
171178
}
172179

173180
// ------------------------------------------------------------ BadVrfProofError

modules/accounts_state/src/accounts_state.rs

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use acropolis_common::{
99
BlockInfo, BlockStatus,
1010
};
1111
use anyhow::Result;
12-
use bigdecimal::Zero;
1312
use caryatid_sdk::{message_bus::Subscription, module, Context};
1413
use config::Config;
1514
use std::sync::Arc;
@@ -36,7 +35,7 @@ use acropolis_common::queries::accounts::{
3635
use acropolis_common::queries::errors::QueryError;
3736
use verifier::Verifier;
3837

39-
use crate::spo_distribution_store::SPDDStore;
38+
use crate::spo_distribution_store::{SPDDStore, SPDDStoreConfig};
4039
mod spo_distribution_store;
4140

4241
const DEFAULT_SPO_STATE_TOPIC: &str = "cardano.spo.state";
@@ -54,6 +53,7 @@ const DEFAULT_STAKE_REWARD_DELTAS_TOPIC: &str = "cardano.stake.reward.deltas";
5453

5554
const DEFAULT_SPDD_DB_PATH: (&str, &str) = ("spdd-db-path", "./fjall-spdd");
5655
const DEFAULT_SPDD_RETENTION_EPOCHS: (&str, u64) = ("spdd-retention-epochs", 0);
56+
const DEFAULT_SPDD_CLEAR_ON_START: (&str, bool) = ("spdd-clear-on-start", true);
5757

5858
/// Accounts State module
5959
#[module(
@@ -161,6 +161,23 @@ impl AccountsState {
161161
None => None,
162162
};
163163

164+
// Publish SPDD message before anything else and store spdd history if enabled
165+
if let Some(block_info) = current_block.as_ref() {
166+
let spdd = state.generate_spdd();
167+
if let Err(e) = spo_publisher.publish_spdd(block_info, spdd).await {
168+
error!("Error publishing SPO stake distribution: {e:#}")
169+
}
170+
171+
// if we store spdd history
172+
let spdd_state = state.dump_spdd_state();
173+
if let Some(mut spdd_store) = spdd_store_guard {
174+
// active stakes taken at beginning of epoch i is for epoch + 1
175+
if let Err(e) = spdd_store.store_spdd(block_info.epoch + 1, spdd_state) {
176+
error!("Error storing SPDD state: {e:#}")
177+
}
178+
}
179+
}
180+
164181
// Handle DRep
165182
let (_, message) = drep_state_subscription.read_ignoring_rollbacks().await?;
166183
match message.as_ref() {
@@ -197,22 +214,6 @@ impl AccountsState {
197214
.handle_spo_state(spo_msg)
198215
.inspect_err(|e| error!("SPOState handling error: {e:#}"))
199216
.ok();
200-
201-
let spdd = state.generate_spdd();
202-
if let Err(e) = spo_publisher.publish_spdd(block_info, spdd).await {
203-
error!("Error publishing SPO stake distribution: {e:#}")
204-
}
205-
206-
// if we store spdd history
207-
let spdd_state = state.dump_spdd_state();
208-
if let Some(mut spdd_store) = spdd_store_guard {
209-
// active stakes taken at beginning of epoch i is for epoch + 1
210-
if let Err(e) =
211-
spdd_store.store_spdd(block_info.epoch + 1, spdd_state)
212-
{
213-
error!("Error storing SPDD state: {e:#}")
214-
}
215-
}
216217
}
217218
.instrument(span)
218219
.await;
@@ -435,24 +436,18 @@ impl AccountsState {
435436
.unwrap_or(DEFAULT_STAKE_REWARD_DELTAS_TOPIC.to_string());
436437
info!("Creating stake reward deltas publisher on '{stake_reward_deltas_topic}'");
437438

439+
// SPDD configs
438440
let spdd_db_path =
439441
config.get_string(DEFAULT_SPDD_DB_PATH.0).unwrap_or(DEFAULT_SPDD_DB_PATH.1.to_string());
440-
441-
// Convert to absolute path if relative
442-
let spdd_db_path = if std::path::Path::new(&spdd_db_path).is_absolute() {
443-
spdd_db_path
444-
} else {
445-
let current_dir = std::env::current_dir()
446-
.map_err(|e| anyhow::anyhow!("Failed to get current directory: {}", e))?;
447-
current_dir.join(&spdd_db_path).to_string_lossy().to_string()
448-
};
449-
450-
// Get SPDD retention epochs configuration
442+
info!("SPDD database path: {spdd_db_path}");
451443
let spdd_retention_epochs = config
452444
.get_int(DEFAULT_SPDD_RETENTION_EPOCHS.0)
453445
.unwrap_or(DEFAULT_SPDD_RETENTION_EPOCHS.1 as i64)
454446
.max(0) as u64;
455447
info!("SPDD retention epochs: {:?}", spdd_retention_epochs);
448+
let spdd_clear_on_start =
449+
config.get_bool(DEFAULT_SPDD_CLEAR_ON_START.0).unwrap_or(DEFAULT_SPDD_CLEAR_ON_START.1);
450+
info!("SPDD clear on start: {spdd_clear_on_start}");
456451

457452
// Query topics
458453
let accounts_query_topic = config
@@ -482,11 +477,13 @@ impl AccountsState {
482477
let history_tick = history.clone();
483478

484479
// Spdd store
485-
let spdd_store = if !spdd_retention_epochs.is_zero() {
486-
Some(Arc::new(Mutex::new(SPDDStore::load(
487-
std::path::Path::new(&spdd_db_path),
488-
spdd_retention_epochs,
489-
)?)))
480+
let spdd_store_config = SPDDStoreConfig {
481+
path: spdd_db_path,
482+
retention_epochs: spdd_retention_epochs,
483+
clear_on_start: spdd_clear_on_start,
484+
};
485+
let spdd_store = if spdd_store_config.is_enabled() {
486+
Some(Arc::new(Mutex::new(SPDDStore::new(&spdd_store_config)?)))
490487
} else {
491488
None
492489
};

modules/accounts_state/src/spo_distribution_store.rs

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use acropolis_common::{PoolId, StakeAddress};
22
use anyhow::Result;
3+
use bigdecimal::Zero;
34
use fjall::{Config, Keyspace, PartitionCreateOptions};
45
use std::collections::HashMap;
56

@@ -43,6 +44,19 @@ fn encode_epoch_marker(epoch: u64) -> Vec<u8> {
4344
epoch.to_be_bytes().to_vec()
4445
}
4546

47+
#[derive(Debug, Clone)]
48+
pub struct SPDDStoreConfig {
49+
pub path: String,
50+
pub retention_epochs: u64,
51+
pub clear_on_start: bool,
52+
}
53+
54+
impl SPDDStoreConfig {
55+
pub fn is_enabled(&self) -> bool {
56+
!self.retention_epochs.is_zero()
57+
}
58+
}
59+
4660
pub struct SPDDStore {
4761
keyspace: Keyspace,
4862
/// Partition for all SPDD data
@@ -58,10 +72,9 @@ pub struct SPDDStore {
5872
}
5973

6074
impl SPDDStore {
61-
#[allow(dead_code)]
62-
pub fn new(path: impl AsRef<std::path::Path>, retention_epochs: u64) -> fjall::Result<Self> {
63-
let path = path.as_ref();
64-
if path.exists() {
75+
pub fn new(config: &SPDDStoreConfig) -> fjall::Result<Self> {
76+
let path = std::path::Path::new(&config.path);
77+
if config.clear_on_start && path.exists() {
6578
std::fs::remove_dir_all(path)?;
6679
}
6780

@@ -74,23 +87,7 @@ impl SPDDStore {
7487
keyspace,
7588
spdd,
7689
epoch_markers,
77-
retention_epochs,
78-
})
79-
}
80-
81-
pub fn load(path: impl AsRef<std::path::Path>, retention_epochs: u64) -> fjall::Result<Self> {
82-
let path = path.as_ref();
83-
84-
let keyspace = Config::new(path).open()?;
85-
let spdd = keyspace.open_partition("spdd", PartitionCreateOptions::default())?;
86-
let epoch_markers =
87-
keyspace.open_partition("epoch_markers", PartitionCreateOptions::default())?;
88-
89-
Ok(Self {
90-
keyspace,
91-
spdd,
92-
epoch_markers,
93-
retention_epochs,
90+
retention_epochs: config.retention_epochs,
9491
})
9592
}
9693

@@ -237,11 +234,18 @@ mod tests {
237234
StakeAddress::new(StakeCredential::AddrKeyHash(keyhash_224(&[byte])), Mainnet)
238235
}
239236

237+
fn spdd_store_config(retention_epochs: u64) -> SPDDStoreConfig {
238+
SPDDStoreConfig {
239+
path: TempDir::new().unwrap().path().to_string_lossy().into_owned(),
240+
retention_epochs,
241+
clear_on_start: true,
242+
}
243+
}
244+
240245
#[test]
241246
fn test_store_and_query_spdd() {
242-
let temp_dir = TempDir::new().unwrap();
243-
let mut spdd_store =
244-
SPDDStore::new(temp_dir.path(), 10).expect("Failed to create SPDD store");
247+
let config = spdd_store_config(10);
248+
let mut spdd_store = SPDDStore::new(&config).expect("Failed to create SPDD store");
245249

246250
let mut spdd_state: HashMap<PoolId, Vec<(StakeAddress, u64)>> = HashMap::new();
247251
spdd_state.insert(
@@ -273,9 +277,8 @@ mod tests {
273277

274278
#[test]
275279
fn test_retention_pruning() {
276-
let temp_dir = TempDir::new().unwrap();
277-
let mut spdd_store =
278-
SPDDStore::new(temp_dir.path(), 2).expect("Failed to create SPDD store");
280+
let config = spdd_store_config(2);
281+
let mut spdd_store = SPDDStore::new(&config).expect("Failed to create SPDD store");
279282

280283
// Store epochs 1, 2, 3
281284
for epoch in 1..=3 {
@@ -302,8 +305,8 @@ mod tests {
302305

303306
#[test]
304307
fn test_query_incomplete_epoch() {
305-
let temp_dir = TempDir::new().unwrap();
306-
let spdd_store = SPDDStore::new(temp_dir.path(), 10).expect("Failed to create SPDD store");
308+
let config = spdd_store_config(10);
309+
let spdd_store = SPDDStore::new(&config).expect("Failed to create SPDD store");
307310

308311
assert!(!spdd_store.is_epoch_complete(999).unwrap());
309312
assert!(spdd_store.query_by_epoch(999).is_err());
@@ -312,9 +315,8 @@ mod tests {
312315

313316
#[test]
314317
fn test_remove_epoch_data() {
315-
let temp_dir = TempDir::new().unwrap();
316-
let mut spdd_store =
317-
SPDDStore::new(temp_dir.path(), 10).expect("Failed to create SPDD store");
318+
let config = spdd_store_config(10);
319+
let mut spdd_store = SPDDStore::new(&config).expect("Failed to create SPDD store");
318320

319321
let mut spdd_state: HashMap<PoolId, Vec<(StakeAddress, u64)>> = HashMap::new();
320322
spdd_state.insert(

modules/block_vrf_validator/src/block_vrf_validator.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = (
3434
"protocol-parameters-subscribe-topic",
3535
"cardano.protocol.parameters",
3636
);
37-
const DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC: (&str, &str) =
38-
("epoch-activity-subscribe-topic", "cardano.epoch.activity");
37+
const DEFAULT_EPOCH_NONCE_SUBSCRIBE_TOPIC: (&str, &str) =
38+
("epoch-nonce-subscribe-topic", "cardano.epoch.nonce");
3939
const DEFAULT_SPO_STATE_SUBSCRIBE_TOPIC: (&str, &str) =
4040
("spo-state-subscribe-topic", "cardano.spo.state");
4141
const DEFAULT_SPDD_SUBSCRIBE_TOPIC: (&str, &str) =
@@ -58,7 +58,7 @@ impl BlockVrfValidator {
5858
mut bootstrapped_subscription: Box<dyn Subscription<Message>>,
5959
mut block_subscription: Box<dyn Subscription<Message>>,
6060
mut protocol_parameters_subscription: Box<dyn Subscription<Message>>,
61-
mut epoch_activity_subscription: Box<dyn Subscription<Message>>,
61+
mut epoch_nonce_subscription: Box<dyn Subscription<Message>>,
6262
mut spo_state_subscription: Box<dyn Subscription<Message>>,
6363
mut spdd_subscription: Box<dyn Subscription<Message>>,
6464
) -> Result<()> {
@@ -90,8 +90,10 @@ impl BlockVrfValidator {
9090

9191
if is_new_epoch {
9292
// read epoch boundary messages
93-
let (_, protocol_parameters_msg) =
94-
protocol_parameters_subscription.read_ignoring_rollbacks().await?;
93+
let protocol_parameters_message_f = protocol_parameters_subscription.read();
94+
let epoch_nonce_message_f = epoch_nonce_subscription.read();
95+
96+
let (_, protocol_parameters_msg) = protocol_parameters_message_f.await?;
9597
let span = info_span!(
9698
"block_vrf_validator.handle_protocol_parameters",
9799
epoch = block_info.epoch
@@ -104,18 +106,20 @@ impl BlockVrfValidator {
104106
_ => error!("Unexpected message type: {protocol_parameters_msg:?}"),
105107
});
106108

107-
let (_, epoch_activity_msg) =
108-
epoch_activity_subscription.read_ignoring_rollbacks().await?;
109+
let (_, epoch_nonce_msg) = epoch_nonce_message_f.await?;
109110
let span = info_span!(
110-
"block_vrf_validator.handle_epoch_activity",
111+
"block_vrf_validator.handle_epoch_nonce",
111112
epoch = block_info.epoch
112113
);
113-
span.in_scope(|| match epoch_activity_msg.as_ref() {
114-
Message::Cardano((block_info, CardanoMessage::EpochActivity(msg))) => {
114+
span.in_scope(|| match epoch_nonce_msg.as_ref() {
115+
Message::Cardano((
116+
block_info,
117+
CardanoMessage::EpochNonce(active_nonce),
118+
)) => {
115119
Self::check_sync(&current_block, block_info);
116-
state.handle_epoch_activity(msg);
120+
state.handle_epoch_nonce(active_nonce);
117121
}
118-
_ => error!("Unexpected message type: {epoch_activity_msg:?}"),
122+
_ => error!("Unexpected message type: {epoch_nonce_msg:?}"),
119123
});
120124

121125
let (_, spo_state_msg) =
@@ -194,10 +198,10 @@ impl BlockVrfValidator {
194198
.unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string());
195199
info!("Creating block subscription on '{block_subscribe_topic}'");
196200

197-
let epoch_activity_subscribe_topic = config
198-
.get_string(DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC.0)
199-
.unwrap_or(DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC.1.to_string());
200-
info!("Creating epoch activity subscription on '{epoch_activity_subscribe_topic}'");
201+
let epoch_nonce_subscribe_topic = config
202+
.get_string(DEFAULT_EPOCH_NONCE_SUBSCRIBE_TOPIC.0)
203+
.unwrap_or(DEFAULT_EPOCH_NONCE_SUBSCRIBE_TOPIC.1.to_string());
204+
info!("Creating epoch nonce subscription on '{epoch_nonce_subscribe_topic}'");
201205

202206
let spo_state_subscribe_topic = config
203207
.get_string(DEFAULT_SPO_STATE_SUBSCRIBE_TOPIC.0)
@@ -218,8 +222,7 @@ impl BlockVrfValidator {
218222
let protocol_parameters_subscription =
219223
context.subscribe(&protocol_parameters_subscribe_topic).await?;
220224
let block_subscription = context.subscribe(&block_subscribe_topic).await?;
221-
let epoch_activity_subscription =
222-
context.subscribe(&epoch_activity_subscribe_topic).await?;
225+
let epoch_nonce_subscription = context.subscribe(&epoch_nonce_subscribe_topic).await?;
223226
let spo_state_subscription = context.subscribe(&spo_state_subscribe_topic).await?;
224227
let spdd_subscription = context.subscribe(&spdd_subscribe_topic).await?;
225228

@@ -237,7 +240,7 @@ impl BlockVrfValidator {
237240
bootstrapped_subscription,
238241
block_subscription,
239242
protocol_parameters_subscription,
240-
epoch_activity_subscription,
243+
epoch_nonce_subscription,
241244
spo_state_subscription,
242245
spdd_subscription,
243246
)

modules/block_vrf_validator/src/ouroboros/praos.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ pub fn validate_vrf_praos<'a>(
7070
}),
7171
Box::new(move || {
7272
validate_vrf_leader_value(
73+
&pool_id,
7374
&header.leader_vrf_output().map_err(|_| {
7475
VrfValidationError::Other("Leader VRF Output is not set".to_string())
7576
})?[..],

0 commit comments

Comments
 (0)