From 9aa7f0a14d1e4ff90dcd3a574e90dd10b8c0b89b Mon Sep 17 00:00:00 2001 From: buddhisthead Date: Wed, 3 Dec 2025 13:07:24 -0800 Subject: [PATCH] Implement snapshot parser for snapshot delegation and pool registration * Implemented parser and type conversions for pool registration * Parsing delegations * Add logging output for new snapshot data to the test example --- common/examples/test_streaming_parser.rs | 71 ++++-- common/src/snapshot/mark_set_go.rs | 290 ++++++++++++---------- common/src/snapshot/streaming_snapshot.rs | 2 +- 3 files changed, 220 insertions(+), 143 deletions(-) diff --git a/common/examples/test_streaming_parser.rs b/common/examples/test_streaming_parser.rs index 7df827e4..a71ebf61 100644 --- a/common/examples/test_streaming_parser.rs +++ b/common/examples/test_streaming_parser.rs @@ -275,28 +275,67 @@ impl SnapshotCallbacks for CountingCallbacks { impl SnapshotsCallback for CountingCallbacks { fn on_snapshots(&mut self, snapshots: RawSnapshotsContainer) -> Result<()> { eprintln!("Raw Snapshots Data:"); + eprintln!(); // Calculate total stakes and delegator counts from VMap data let mark_total: i64 = snapshots.mark.0.iter().map(|(_, amount)| amount).sum(); let set_total: i64 = snapshots.set.0.iter().map(|(_, amount)| amount).sum(); let go_total: i64 = snapshots.go.0.iter().map(|(_, amount)| amount).sum(); - eprintln!( - " Mark snapshot: {} delegators, {} total stake (ADA)", - snapshots.mark.0.len(), - mark_total as f64 / 1_000_000.0 - ); - eprintln!( - " Set snapshot: {} delegators, {} total stake (ADA)", - snapshots.set.0.len(), - set_total as f64 / 1_000_000.0 - ); - eprintln!( - " Go snapshot: {} delegators, {} total stake (ADA)", - snapshots.go.0.len(), - go_total as f64 / 1_000_000.0 - ); - eprintln!(" Fee: {} ADA", snapshots.fee as f64 / 1_000_000.0); + eprintln!("Mark Snapshot:"); + eprintln!(" Delegators: {}", snapshots.mark.0.len()); + eprintln!(" Total stake: {:.2} ADA", mark_total as f64 / 1_000_000.0); + if !snapshots.mark.0.is_empty() { + eprintln!(" Sample stakes (first 5):"); + for (i, (cred, amount)) in snapshots.mark.0.iter().take(5).enumerate() { + let cred_str = cred.to_string().unwrap_or_default(); + eprintln!( + " [{}] {} -> {:.2} ADA", + i + 1, + cred_str, + *amount as f64 / 1_000_000.0 + ); + } + } + eprintln!(); + + eprintln!("Set Snapshot:"); + eprintln!(" Delegators: {}", snapshots.set.0.len()); + eprintln!(" Total stake: {:.2} ADA", set_total as f64 / 1_000_000.0); + if !snapshots.set.0.is_empty() { + eprintln!(" Sample stakes (first 5):"); + for (i, (cred, amount)) in snapshots.set.0.iter().take(5).enumerate() { + let cred_str = cred.to_string().unwrap_or_default(); + eprintln!( + " [{}] {} -> {:.2} ADA", + i + 1, + cred_str, + *amount as f64 / 1_000_000.0 + ); + } + } + eprintln!(); + + eprintln!("Go Snapshot:"); + eprintln!(" Delegators: {}", snapshots.go.0.len()); + eprintln!(" Total stake: {:.2} ADA", go_total as f64 / 1_000_000.0); + if !snapshots.go.0.is_empty() { + eprintln!(" Sample stakes (first 5):"); + for (i, (cred, amount)) in snapshots.go.0.iter().take(5).enumerate() { + let cred_str = cred.to_string().unwrap_or_default(); + eprintln!( + " [{}] {} -> {:.2} ADA", + i + 1, + cred_str, + *amount as f64 / 1_000_000.0 + ); + } + } + eprintln!(); + + eprintln!("Fee: {:.2} ADA", snapshots.fee as f64 / 1_000_000.0); + eprintln!(); + Ok(()) } } diff --git a/common/src/snapshot/mark_set_go.rs b/common/src/snapshot/mark_set_go.rs index 8afb337f..400557e1 100644 --- a/common/src/snapshot/mark_set_go.rs +++ b/common/src/snapshot/mark_set_go.rs @@ -2,15 +2,21 @@ // Mark, Set, Go Snapshots Support // ================================================================================================ -use anyhow::{Context, Result}; -use log::info; +use anyhow::{Context, Error, Result}; +use log::{error, info}; use minicbor::Decoder; use serde::Serialize; +use types::Ratio; + pub use crate::hash::Hash; +use crate::snapshot::pool_params::PoolParams; +use crate::snapshot::streaming_snapshot; pub use crate::stake_addresses::{AccountState, StakeAddressState}; +use crate::PoolId; pub use crate::StakeCredential; +use crate::{address::StakeAddress, types, NetworkId, PoolRegistration}; /// VMap representation for CBOR Map types #[derive(Debug, Clone, PartialEq, Serialize)] @@ -77,11 +83,10 @@ pub trait SnapshotsCallback { pub struct Snapshot { /// snapshot_stake: stake distribution map (credential -> lovelace amount) pub snapshot_stake: VMap, - // snapshot_delegations: delegation map (credential -> stake pool key hash) - // pub snapshot_delegations: VMap, - - // snapshot_pool_params: pool parameters map (stake pool key hash -> pool params) - // snapshot_pool_params: VMap, Vec>, + // snapshot_delegations: vmap>,) + pub snapshot_delegations: VMap, + // snapshot_pool_params: vmap, pool_params>, + pub snapshot_pool_params: VMap, } impl Snapshot { @@ -91,40 +96,6 @@ impl Snapshot { // Check what type we have - could be array, map, or simple value match decoder.datatype().context("Failed to read snapshot datatype")? { - minicbor::data::Type::Map | minicbor::data::Type::MapIndef => { - info!( - " {snapshot_name} snapshot is a map - treating as stake distribution directly" - ); - // Try VMap first, then fallback to simple map - match decoder.decode::>() { - Ok(snapshot_stake) => { - info!( - " {} snapshot - successfully decoded {} stake entries with VMap", - snapshot_name, - snapshot_stake.0.len() - ); - Ok(Snapshot { snapshot_stake }) - } - Err(vmap_error) => { - info!( - " {snapshot_name} snapshot - VMap decode failed: {vmap_error}" - ); - info!( - " {snapshot_name} snapshot - trying simple BTreeMap" - ); - - // Reset decoder and try simple map format - // Note: We can't reset the decoder, so we need to handle this differently - // For now, return an empty snapshot to continue processing - info!( - " {snapshot_name} snapshot - using empty fallback due to format mismatch" - ); - Ok(Snapshot { - snapshot_stake: VMap(Vec::new()), - }) - } - } - } minicbor::data::Type::Array => { info!(" {snapshot_name} snapshot is an array"); decoder.array().context("Failed to parse snapshot array")?; @@ -140,104 +111,171 @@ impl Snapshot { let snapshot_stake: VMap = decoder.decode()?; // Skip delegations (second element) - decoder.skip().context("Failed to skip snapshot_delegations")?; + info!(" {snapshot_name} snapshot - parsing snapshot_delegations..."); - // Skip pool_params (third element) - decoder.skip().context("Failed to skip snapshot_pool_params")?; + let delegations: VMap = + decoder.decode().context("Failed to parse snapshot_delegations")?; - Ok(Snapshot { snapshot_stake }) - } - other_type => { info!( - " {snapshot_name} snapshot - first element is {other_type:?}, skipping entire array" + " {snapshot_name} snapshot - parsing snapshot_pool_registration..." + ); + // pool_registration (third element) + let pools: VMap = decoder + .decode() + .context("Failed to parse snapshot_pool_registration")?; + let registration = VMap( + pools + .0 + .into_iter() + .map(|(pool_id, params)| { + // Convert RewardAccount (Vec) to StakeAddress (arbitralily chosen over ScripHash) + let reward_account = + StakeAddress::from_binary(¶ms.reward_account.0) + .unwrap_or_else(|_| + { + error!("Failed to parse reward account for pool {pool_id}, using default"); + StakeAddress::default() + } + ); + + // Convert Set to Vec + let pool_owners: Vec = params + .owners + .0 + .into_iter() + .map(|keyhash| { + StakeAddress::new( + StakeCredential::AddrKeyHash(keyhash), + NetworkId::Mainnet, // TODO: Make network configurable or get it from parameters + ) + }) + .collect(); + + // Convert Vec to Vec + let relays: Vec = params + .relays + .into_iter() + .map(|relay| match relay { + streaming_snapshot::Relay::SingleHostAddr( + port, + ipv4, + ipv6, + ) => { + let port_opt = match port { + streaming_snapshot::Nullable::Some(p) => { + Some(p as u16) + } + _ => None, + }; + let ipv4_opt = match ipv4 { + streaming_snapshot::Nullable::Some(ip) + if ip.0.len() == 4 => + { + Some(std::net::Ipv4Addr::new( + ip.0[0], ip.0[1], ip.0[2], ip.0[3], + )) + } + _ => None, + }; + let ipv6_opt = match ipv6 { + streaming_snapshot::Nullable::Some(ip) + if ip.0.len() == 16 => + { + let b = &ip.0; + Some(std::net::Ipv6Addr::from([ + b[0], b[1], b[2], b[3], b[4], b[5], + b[6], b[7], b[8], b[9], b[10], b[11], + b[12], b[13], b[14], b[15], + ])) + } + _ => None, + }; + types::Relay::SingleHostAddr( + types::SingleHostAddr { + port: port_opt, + ipv4: ipv4_opt, + ipv6: ipv6_opt, + }, + ) + } + streaming_snapshot::Relay::SingleHostName( + port, + hostname, + ) => { + let port_opt = match port { + streaming_snapshot::Nullable::Some(p) => { + Some(p as u16) + } + _ => None, + }; + types::Relay::SingleHostName( + types::SingleHostName { + port: port_opt, + dns_name: hostname, + }, + ) + } + streaming_snapshot::Relay::MultiHostName(hostname) => { + types::Relay::MultiHostName(types::MultiHostName { + dns_name: hostname, + }) + } + }) + .collect(); + + // Convert Nullable to Option + let pool_metadata = match params.metadata { + streaming_snapshot::Nullable::Some(meta) => { + Some(types::PoolMetadata { + url: meta.url, + hash: meta.hash.to_vec(), + }) + } + _ => None, + }; + + ( + pool_id, + PoolRegistration { + operator: params.id, + vrf_key_hash: params.vrf, + pledge: params.pledge, + cost: params.cost, + margin: Ratio { + numerator: params.margin.numerator, + denominator: params.margin.denominator, + }, + reward_account, + pool_owners, + relays, + pool_metadata, + }, + ) + }) + .collect(), ); - // We don't know how many elements are in this array, so just skip the first element - // and let the array parsing naturally complete - decoder.skip().context("Failed to skip first element")?; - - // Try to skip remaining elements, but don't fail if there aren't exactly 3 - loop { - match decoder.datatype() { - Ok(minicbor::data::Type::Break) => { - // End of indefinite array - break; - } - Ok(_) => { - // More elements to skip - decoder.skip().ok(); // Don't fail on individual skips - } - Err(_) => { - // End of definite array or other error - break - break; - } - } - } + + info!(" {snapshot_name} snapshot - parse completed successfully."); Ok(Snapshot { - snapshot_stake: VMap(Vec::new()), + snapshot_stake, + snapshot_delegations: delegations, + snapshot_pool_params: registration, }) } - } - } - minicbor::data::Type::U32 - | minicbor::data::Type::U64 - | minicbor::data::Type::U8 - | minicbor::data::Type::U16 => { - let value = decoder.u64().context("Failed to parse snapshot value")?; - info!(" {snapshot_name} snapshot is a simple value: {value}"); - - // Return empty snapshot for simple values - Ok(Snapshot { - snapshot_stake: VMap(Vec::new()), - }) - } - minicbor::data::Type::Break => { - info!( - " {snapshot_name} snapshot is a Break token - indicates end of indefinite structure" - ); - // Don't consume the break token, let the parent structure handle it - // Return empty snapshot - Ok(Snapshot { - snapshot_stake: VMap(Vec::new()), - }) - } - minicbor::data::Type::Tag => { - info!( - " {snapshot_name} snapshot starts with a CBOR tag, trying to skip tag and parse content" - ); - let _tag = decoder.tag().context("Failed to read CBOR tag")?; - info!( - " {snapshot_name} snapshot - found tag {_tag}, checking tagged content..." - ); - - // After consuming tag, try to parse the tagged content - match decoder.datatype().context("Failed to read tagged content datatype")? { - minicbor::data::Type::Map | minicbor::data::Type::MapIndef => { - let snapshot_stake: VMap = decoder.decode()?; - Ok(Snapshot { snapshot_stake }) - } - other_tagged_type => { + other_type => { info!( - " {snapshot_name} snapshot - tagged content is {other_tagged_type:?}, skipping" + " {snapshot_name} snapshot - first element is {other_type:?}, skipping entire array" ); - decoder.skip().ok(); // Don't fail on skip - Ok(Snapshot { - snapshot_stake: VMap(Vec::new()), - }) + Err(Error::msg( + "Unexpected first element type in snapshot array", + )) } } } - other_type => { - info!( - " {snapshot_name} snapshot has unexpected type: {other_type:?}, skipping..." - ); - decoder.skip().ok(); // Don't fail on skip - - // Return empty snapshot - Ok(Snapshot { - snapshot_stake: VMap(Vec::new()), - }) - } + other_type => Err(Error::msg(format!( + "Unexpected snapshot data type: {other_type:?}" + ))), } } } diff --git a/common/src/snapshot/streaming_snapshot.rs b/common/src/snapshot/streaming_snapshot.rs index 9d0d2ccf..d2f061c2 100644 --- a/common/src/snapshot/streaming_snapshot.rs +++ b/common/src/snapshot/streaming_snapshot.rs @@ -1767,7 +1767,7 @@ impl StreamingSnapshotParser { /// Parse PState to extract stake pools /// PState = [pools_map, future_pools_map, retiring_map, deposits_map] - fn parse_pstate(decoder: &mut Decoder) -> Result> { + pub fn parse_pstate(decoder: &mut Decoder) -> Result> { // Parse PState array let pstate_len = decoder .array()