Skip to content

Commit 6b4a449

Browse files
authored
Merge pull request #434 from input-output-hk/cet/stream-mark-set-go
feat: update snapshot parser for mark/set/go
2 parents 2b8c204 + abf7593 commit 6b4a449

File tree

10 files changed

+806
-383
lines changed

10 files changed

+806
-383
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
target
22
README.html
3+
restore.sh
4+
startup.sh
5+
configuration/
6+
docker-compose.yaml
37

48
# Nix
59
result

Cargo.lock

Lines changed: 41 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,13 @@ snap-test-streaming: $(SNAPSHOT)
7777
@echo "=================================="
7878
@echo "Snapshot: $(SNAPSHOT)"
7979
@echo "Size: $$(du -h $(SNAPSHOT) | cut -f1)"
80+
@echo "Log Level: $(LOG_LEVEL)"
8081
@echo ""
8182
@test -f "$(SNAPSHOT)" || (echo "Error: Snapshot file not found: $(SNAPSHOT)"; exit 1)
8283
@echo "This will parse the entire snapshot and collect all data with callbacks..."
8384
@echo "Expected time: ~1-3 minutes for 2.4GB snapshot with 11M UTXOs"
8485
@echo ""
85-
@$(CARGO) run --release --example test_streaming_parser -- "$(SNAPSHOT)"
86+
RUST_LOG=$(LOG_LEVEL) $(CARGO) run --release --example test_streaming_parser -- "$(SNAPSHOT)"
8687

8788
# Pattern rule: generate .json manifest from .cbor snapshot
8889
# Usage: make tests/fixtures/my-snapshot.json

common/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ bs58 = "0.5"
2424
chrono = { workspace = true }
2525
crc = "3"
2626
hex = { workspace = true }
27+
log = "0.4"
2728
memmap2 = "0.9"
2829
num-rational = { version = "0.4.2", features = ["serde"] }
2930
regex = "1"
@@ -45,7 +46,7 @@ sha2 = "0.10.8"
4546
tempfile = "3.23"
4647
config = { workspace = true }
4748
caryatid_process = { workspace = true }
48-
49+
env_logger = "0.10"
4950

5051
[lib]
5152
crate-type = ["rlib"]

common/examples/test_streaming_parser.rs

Lines changed: 100 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,17 @@
22
//
33
// Usage: cargo run --example test_streaming_parser --release -- <snapshot_path>
44

5-
use acropolis_common::snapshot::streaming_snapshot::{
5+
use acropolis_common::snapshot::{
66
AccountState, DRepCallback, DRepInfo, GovernanceProposal, PoolCallback, PoolInfo,
7-
ProposalCallback, SnapshotCallbacks, SnapshotMetadata, StakeCallback, StreamingSnapshotParser,
8-
UtxoCallback, UtxoEntry,
7+
ProposalCallback, RawSnapshotsContainer, SnapshotCallbacks, SnapshotMetadata,
8+
SnapshotsCallback, StakeCallback, StreamingSnapshotParser, UtxoCallback, UtxoEntry,
99
};
1010
use anyhow::Result;
1111
use std::env;
1212
use std::time::Instant;
1313

14+
use env_logger::Env;
15+
1416
// Simple counter callback that doesn't store data in memory
1517
#[derive(Default)]
1618
struct CountingCallbacks {
@@ -55,7 +57,7 @@ impl UtxoCallback for CountingCallbacks {
5557
impl PoolCallback for CountingCallbacks {
5658
fn on_pools(&mut self, pools: Vec<PoolInfo>) -> Result<()> {
5759
self.pool_count = pools.len();
58-
eprintln!("Parsed {} stake pools", pools.len());
60+
eprintln!("Parsed {} stake pools", pools.len());
5961

6062
// Show first 10 pools
6163
for (i, pool) in pools.iter().take(10).enumerate() {
@@ -79,7 +81,7 @@ impl StakeCallback for CountingCallbacks {
7981
fn on_accounts(&mut self, accounts: Vec<AccountState>) -> Result<()> {
8082
self.account_count = accounts.len();
8183
if !accounts.is_empty() {
82-
eprintln!("Parsed {} stake accounts", accounts.len());
84+
eprintln!("Parsed {} stake accounts", accounts.len());
8385

8486
// Show first 10 accounts
8587
for (i, account) in accounts.iter().take(10).enumerate() {
@@ -104,7 +106,7 @@ impl StakeCallback for CountingCallbacks {
104106
impl DRepCallback for CountingCallbacks {
105107
fn on_dreps(&mut self, dreps: Vec<DRepInfo>) -> Result<()> {
106108
self.drep_count = dreps.len();
107-
eprintln!("Parsed {} DReps", self.drep_count);
109+
eprintln!("Parsed {} DReps", self.drep_count);
108110

109111
// Show first 10 DReps
110112
for (i, drep) in dreps.iter().take(10).enumerate() {
@@ -136,7 +138,7 @@ impl ProposalCallback for CountingCallbacks {
136138
fn on_proposals(&mut self, proposals: Vec<GovernanceProposal>) -> Result<()> {
137139
self.proposal_count = proposals.len();
138140
if !proposals.is_empty() {
139-
eprintln!("Parsed {} governance proposals", proposals.len());
141+
eprintln!("Parsed {} governance proposals", proposals.len());
140142

141143
// Show first 10 proposals
142144
for (i, proposal) in proposals.iter().take(10).enumerate() {
@@ -159,22 +161,22 @@ impl ProposalCallback for CountingCallbacks {
159161

160162
impl SnapshotCallbacks for CountingCallbacks {
161163
fn on_metadata(&mut self, metadata: SnapshotMetadata) -> Result<()> {
162-
eprintln!("📊 Snapshot Metadata:");
163-
eprintln!(" Epoch: {}", metadata.epoch);
164+
eprintln!("Snapshot Metadata:");
165+
eprintln!(" Epoch: {}", metadata.epoch);
164166
eprintln!(
165-
" Treasury: {} ADA",
167+
" Treasury: {} ADA",
166168
metadata.pot_balances.treasury as f64 / 1_000_000.0
167169
);
168170
eprintln!(
169-
" Reserves: {} ADA",
171+
" Reserves: {} ADA",
170172
metadata.pot_balances.reserves as f64 / 1_000_000.0
171173
);
172174
eprintln!(
173-
" Deposits: {} ADA",
175+
" Deposits: {} ADA",
174176
metadata.pot_balances.deposits as f64 / 1_000_000.0
175177
);
176178
if let Some(count) = metadata.utxo_count {
177-
eprintln!(" UTXO count: {}", count);
179+
eprintln!(" UTXO count: {count}");
178180
}
179181
// Calculate total blocks produced
180182
let total_blocks_previous: u32 =
@@ -183,19 +185,43 @@ impl SnapshotCallbacks for CountingCallbacks {
183185
metadata.blocks_current_epoch.iter().map(|p| p.block_count as u32).sum();
184186

185187
eprintln!(
186-
" Block production previous epoch: {} pools produced {} blocks total",
188+
" Block production previous epoch: {} pools produced {} blocks total",
187189
metadata.blocks_previous_epoch.len(),
188190
total_blocks_previous
189191
);
190192
eprintln!(
191-
" Block production current epoch: {} pools produced {} blocks total",
193+
" Block production current epoch: {} pools produced {} blocks total",
192194
metadata.blocks_current_epoch.len(),
193195
total_blocks_current
194196
);
195197

198+
// Show snapshots info if available
199+
if let Some(snapshots_info) = &metadata.snapshots {
200+
eprintln!(" Snapshots Info:");
201+
eprintln!(
202+
" Mark snapshot: {} sections",
203+
snapshots_info.mark.sections_count
204+
);
205+
eprintln!(
206+
" Set snapshot: {} sections",
207+
snapshots_info.set.sections_count
208+
);
209+
eprintln!(
210+
" Go snapshot: {} sections",
211+
snapshots_info.go.sections_count
212+
);
213+
eprintln!(
214+
" Fee value: {} lovelace ({} ADA)",
215+
snapshots_info.fee,
216+
snapshots_info.fee as f64 / 1_000_000.0
217+
);
218+
} else {
219+
eprintln!(" No snapshots data available");
220+
}
221+
196222
// Show top block producers if any
197223
if !metadata.blocks_previous_epoch.is_empty() {
198-
eprintln!(" 📦 Previous epoch top producers (first 3):");
224+
eprintln!(" Previous epoch top producers (first 3):");
199225
let mut sorted_previous = metadata.blocks_previous_epoch.clone();
200226
sorted_previous.sort_by(|a, b| b.block_count.cmp(&a.block_count));
201227
for (i, production) in sorted_previous.iter().take(3).enumerate() {
@@ -216,7 +242,7 @@ impl SnapshotCallbacks for CountingCallbacks {
216242
}
217243

218244
if !metadata.blocks_current_epoch.is_empty() {
219-
eprintln!(" 📦 Current epoch top producers (first 3):");
245+
eprintln!(" Current epoch top producers (first 3):");
220246
let mut sorted_current = metadata.blocks_current_epoch.clone();
221247
sorted_current.sort_by(|a, b| b.block_count.cmp(&a.block_count));
222248
for (i, production) in sorted_current.iter().take(3).enumerate() {
@@ -246,6 +272,35 @@ impl SnapshotCallbacks for CountingCallbacks {
246272
}
247273
}
248274

275+
impl SnapshotsCallback for CountingCallbacks {
276+
fn on_snapshots(&mut self, snapshots: RawSnapshotsContainer) -> Result<()> {
277+
eprintln!("Raw Snapshots Data:");
278+
279+
// Calculate total stakes and delegator counts from VMap data
280+
let mark_total: i64 = snapshots.mark.0.iter().map(|(_, amount)| amount).sum();
281+
let set_total: i64 = snapshots.set.0.iter().map(|(_, amount)| amount).sum();
282+
let go_total: i64 = snapshots.go.0.iter().map(|(_, amount)| amount).sum();
283+
284+
eprintln!(
285+
" Mark snapshot: {} delegators, {} total stake (ADA)",
286+
snapshots.mark.0.len(),
287+
mark_total as f64 / 1_000_000.0
288+
);
289+
eprintln!(
290+
" Set snapshot: {} delegators, {} total stake (ADA)",
291+
snapshots.set.0.len(),
292+
set_total as f64 / 1_000_000.0
293+
);
294+
eprintln!(
295+
" Go snapshot: {} delegators, {} total stake (ADA)",
296+
snapshots.go.0.len(),
297+
go_total as f64 / 1_000_000.0
298+
);
299+
eprintln!(" Fee: {} ADA", snapshots.fee as f64 / 1_000_000.0);
300+
Ok(())
301+
}
302+
}
303+
249304
fn main() {
250305
// Get snapshot path from command line
251306
let args: Vec<String> = env::args().collect();
@@ -255,11 +310,14 @@ fn main() {
255310
std::process::exit(1);
256311
}
257312

313+
// Initialize env_logger to read RUST_LOG environment variable
314+
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
315+
258316
let snapshot_path = &args[1];
259-
println!("🚀 Streaming Snapshot Parser Test with Block Parsing");
317+
println!("Streaming Snapshot Parser Test with Block Parsing");
260318
println!("====================================================");
261-
println!("Snapshot: {}", snapshot_path);
262-
println!("Features: UTXOs, Pools, Accounts, DReps, Proposals, and 📦 BLOCKS!");
319+
println!("Snapshot: {snapshot_path}");
320+
println!("Features: UTXOs, Pools, Accounts, DReps, Proposals, and BLOCKS!");
263321
println!();
264322

265323
// Create parser and callbacks
@@ -273,37 +331,50 @@ fn main() {
273331
match parser.parse(&mut callbacks) {
274332
Ok(()) => {
275333
let duration = start.elapsed();
276-
println!("Parse completed successfully in {:.2?}", duration);
334+
println!("Parse completed successfully in {duration:.2?}");
277335
println!();
278336

279337
// Display results
280338
if let Some(metadata) = &callbacks.metadata {
281-
println!("📊 Final Metadata Summary:");
339+
println!("Final Metadata Summary:");
282340
println!(" Epoch: {}", metadata.epoch);
283341
println!(" Treasury: {} lovelace", metadata.pot_balances.treasury);
284342
println!(" Reserves: {} lovelace", metadata.pot_balances.reserves);
285343
println!(" Deposits: {} lovelace", metadata.pot_balances.deposits);
286344
if let Some(count) = metadata.utxo_count {
287-
println!(" UTXO Count (metadata): {}", count);
345+
println!(" UTXO Count (metadata): {count}");
288346
}
289347
let total_blocks_previous: u32 =
290348
metadata.blocks_previous_epoch.iter().map(|p| p.block_count as u32).sum();
291349
let total_blocks_current: u32 =
292350
metadata.blocks_current_epoch.iter().map(|p| p.block_count as u32).sum();
293351
println!(
294-
" 📦 Block production previous epoch: {} pools, {} blocks total",
352+
" Block production previous epoch: {} pools, {} blocks total",
295353
metadata.blocks_previous_epoch.len(),
296354
total_blocks_previous
297355
);
298356
println!(
299-
" 📦 Block production current epoch: {} pools, {} blocks total",
357+
" Block production current epoch: {} pools, {} blocks total",
300358
metadata.blocks_current_epoch.len(),
301359
total_blocks_current
302360
);
361+
362+
// Show snapshots info summary
363+
if let Some(snapshots_info) = &metadata.snapshots {
364+
println!(" Snapshots Summary:");
365+
println!(
366+
" Mark: {} sections, Set: {} sections, Go: {} sections, Fee: {} ADA",
367+
snapshots_info.mark.sections_count,
368+
snapshots_info.set.sections_count,
369+
snapshots_info.go.sections_count,
370+
snapshots_info.fee as f64 / 1_000_000.0
371+
);
372+
}
373+
303374
println!();
304375
}
305376

306-
println!("📈 Parsed Data Summary:");
377+
println!("Parsed Data Summary:");
307378
println!(" UTXOs: {}", callbacks.utxo_count);
308379
println!(" Stake Pools: {}", callbacks.pool_count);
309380
println!(" Stake Accounts: {}", callbacks.account_count);
@@ -395,14 +466,14 @@ fn main() {
395466
// Performance stats
396467
let utxos_per_sec = callbacks.utxo_count as f64 / duration.as_secs_f64();
397468
println!("Performance:");
398-
println!(" Total time: {:.2?}", duration);
399-
println!(" UTXOs/second: {:.0}", utxos_per_sec);
469+
println!(" Total time: {duration:.2?}");
470+
println!(" UTXOs/second: {utxos_per_sec:.0}");
400471
println!();
401472

402473
std::process::exit(0);
403474
}
404475
Err(e) => {
405-
eprintln!("Parse failed: {:?}", e);
476+
eprintln!("Parse failed: {e:?}");
406477
eprintln!();
407478
std::process::exit(1);
408479
}

0 commit comments

Comments
 (0)