Skip to content

Commit a4db00d

Browse files
authored
Merge pull request #423 from input-output-hk/sg/chain-sync-tweaks
Introduce new rollback strategy
2 parents 9ac80db + 605e6e4 commit a4db00d

File tree

38 files changed

+552
-322
lines changed

38 files changed

+552
-322
lines changed

common/src/caryatid.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use std::sync::Arc;
2+
3+
use anyhow::Result;
4+
use caryatid_sdk::{async_trait, Context, MessageBounds, Subscription};
5+
6+
use crate::messages::{CardanoMessage, Message, StateTransitionMessage};
7+
8+
#[async_trait]
9+
pub trait SubscriptionExt<M: MessageBounds> {
10+
async fn read_ignoring_rollbacks(&mut self) -> Result<(String, Arc<M>)>;
11+
}
12+
13+
#[async_trait]
14+
impl SubscriptionExt<Message> for Box<dyn Subscription<Message>> {
15+
async fn read_ignoring_rollbacks(&mut self) -> Result<(String, Arc<Message>)> {
16+
loop {
17+
let (stream, message) = self.read().await?;
18+
if matches!(
19+
message.as_ref(),
20+
Message::Cardano((
21+
_,
22+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_))
23+
))
24+
) {
25+
continue;
26+
}
27+
break Ok((stream, message));
28+
}
29+
}
30+
}
31+
32+
/// A utility to publish messages, which will only publish rollback messages if some work has been rolled back
33+
pub struct RollbackAwarePublisher<M: MessageBounds> {
34+
/// Module context
35+
context: Arc<Context<M>>,
36+
37+
/// Topic to publish on
38+
topic: String,
39+
40+
// At which slot did we publish our last non-rollback message
41+
last_activity_at: Option<u64>,
42+
}
43+
44+
impl RollbackAwarePublisher<Message> {
45+
pub fn new(context: Arc<Context<Message>>, topic: String) -> Self {
46+
Self {
47+
context,
48+
topic,
49+
last_activity_at: None,
50+
}
51+
}
52+
53+
pub async fn publish(&mut self, message: Arc<Message>) -> Result<()> {
54+
match message.as_ref() {
55+
Message::Cardano((
56+
block,
57+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
58+
)) => {
59+
if self.last_activity_at.is_some_and(|slot| slot >= block.slot) {
60+
self.last_activity_at = None;
61+
self.context.publish(&self.topic, message).await?;
62+
}
63+
Ok(())
64+
}
65+
Message::Cardano((block, _)) => {
66+
self.last_activity_at = Some(block.slot);
67+
self.context.publish(&self.topic, message).await
68+
}
69+
_ => self.context.publish(&self.topic, message).await,
70+
}
71+
}
72+
}

common/src/commands/chain_sync.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use crate::{BlockHash, Slot};
1+
use crate::Point;
22

33
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
44
pub enum ChainSyncCommand {
5-
FindIntersect { slot: Slot, hash: BlockHash },
5+
FindIntersect(Point),
66
}

common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
pub mod address;
44
pub mod calculations;
5+
pub mod caryatid;
56
pub mod cbor;
67
pub mod cip19;
78
pub mod commands;

common/src/messages.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
//! Definition of Acropolis messages
22
3-
// We don't use these messages in the acropolis_common crate itself
4-
#![allow(dead_code)]
5-
63
use crate::commands::chain_sync::ChainSyncCommand;
74
use crate::commands::transactions::{TransactionsCommand, TransactionsCommandResponse};
85
use crate::genesis_values::GenesisValues;
@@ -45,6 +42,13 @@ pub struct RawBlockMessage {
4542
pub body: Vec<u8>,
4643
}
4744

45+
/// Rollback message
46+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
47+
pub enum StateTransitionMessage {
48+
/// The chain has been rolled back to a specific point
49+
Rollback(Point),
50+
}
51+
4852
/// Snapshot completion message
4953
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
5054
pub struct SnapshotCompleteMessage {
@@ -303,6 +307,7 @@ pub struct SPOStateMessage {
303307
#[allow(clippy::large_enum_variant)]
304308
pub enum CardanoMessage {
305309
BlockAvailable(RawBlockMessage), // Block body available
310+
StateTransition(StateTransitionMessage), // Our position on the chain has changed
306311
BlockValidation(ValidationStatus), // Result of a block validation
307312
SnapshotComplete, // Mithril snapshot loaded
308313
ReceivedTxs(RawTxsMessage), // Transaction available

common/src/types.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,17 @@ impl TxOutRef {
724724
/// Slot
725725
pub type Slot = u64;
726726

727+
/// Point on the chain
728+
#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq)]
729+
pub enum Point {
730+
#[default]
731+
Origin,
732+
Specific {
733+
hash: BlockHash,
734+
slot: Slot,
735+
},
736+
}
737+
727738
/// Amount of Ada, in Lovelace
728739
pub type Lovelace = u64;
729740
pub type LovelaceDelta = i64;

modules/accounts_state/src/accounts_state.rs

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
//! Manages stake and reward accounts state
33
44
use acropolis_common::{
5-
messages::{CardanoMessage, Message, StateQuery, StateQueryResponse},
5+
caryatid::SubscriptionExt,
6+
messages::{CardanoMessage, Message, StateQuery, StateQueryResponse, StateTransitionMessage},
67
queries::accounts::{DrepDelegators, PoolDelegators, DEFAULT_ACCOUNTS_QUERY_TOPIC},
78
state_history::{StateHistory, StateHistoryStore},
89
BlockInfo, BlockStatus,
@@ -120,17 +121,13 @@ impl AccountsState {
120121
// Get a mutable state
121122
let mut state = history.lock().await.get_current_state();
122123

123-
// Read per-block topics in parallel
124-
let certs_message_f = certs_subscription.read();
125-
let stake_message_f = stake_subscription.read();
126-
let withdrawals_message_f = withdrawals_subscription.read();
127124
let mut current_block: Option<BlockInfo> = None;
128125

129126
// Use certs_message as the synchroniser, but we have to handle it after the
130127
// epoch things, because they apply to the new epoch, not the last
131-
let (_, certs_message) = certs_message_f.await?;
128+
let (_, certs_message) = certs_subscription.read().await?;
132129
let new_epoch = match certs_message.as_ref() {
133-
Message::Cardano((block_info, _)) => {
130+
Message::Cardano((block_info, CardanoMessage::TxCertificates(_))) => {
134131
// Handle rollbacks on this topic only
135132
if block_info.status == BlockStatus::RolledBack {
136133
state = history.lock().await.get_rolled_back_state(block_info.number);
@@ -139,6 +136,16 @@ impl AccountsState {
139136
current_block = Some(block_info.clone());
140137
block_info.new_epoch && block_info.epoch > 0
141138
}
139+
Message::Cardano((
140+
_,
141+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
142+
)) => {
143+
drep_publisher.publish_rollback(certs_message.clone()).await?;
144+
spo_publisher.publish_rollback(certs_message.clone()).await?;
145+
spo_rewards_publisher.publish_rollback(certs_message.clone()).await?;
146+
stake_reward_deltas_publisher.publish_rollback(certs_message.clone()).await?;
147+
false
148+
}
142149
_ => false,
143150
};
144151

@@ -149,18 +156,13 @@ impl AccountsState {
149156

150157
// Read from epoch-boundary messages only when it's a new epoch
151158
if new_epoch {
152-
let dreps_message_f = drep_state_subscription.read();
153-
let spos_message_f = spos_subscription.read();
154-
let ea_message_f = ea_subscription.read();
155-
let params_message_f = parameters_subscription.read();
156-
157159
let spdd_store_guard = match spdd_store.as_ref() {
158160
Some(s) => Some(s.lock().await),
159161
None => None,
160162
};
161163

162164
// Handle DRep
163-
let (_, message) = dreps_message_f.await?;
165+
let (_, message) = drep_state_subscription.read_ignoring_rollbacks().await?;
164166
match message.as_ref() {
165167
Message::Cardano((block_info, CardanoMessage::DRepState(dreps_msg))) => {
166168
let span = info_span!(
@@ -184,7 +186,7 @@ impl AccountsState {
184186
}
185187

186188
// Handle SPOs
187-
let (_, message) = spos_message_f.await?;
189+
let (_, message) = spos_subscription.read_ignoring_rollbacks().await?;
188190
match message.as_ref() {
189191
Message::Cardano((block_info, CardanoMessage::SPOState(spo_msg))) => {
190192
let span =
@@ -219,7 +221,7 @@ impl AccountsState {
219221
_ => error!("Unexpected message type: {message:?}"),
220222
}
221223

222-
let (_, message) = params_message_f.await?;
224+
let (_, message) = parameters_subscription.read_ignoring_rollbacks().await?;
223225
match message.as_ref() {
224226
Message::Cardano((block_info, CardanoMessage::ProtocolParams(params_msg))) => {
225227
let span = info_span!(
@@ -241,7 +243,7 @@ impl AccountsState {
241243
}
242244

243245
// Handle epoch activity
244-
let (_, message) = ea_message_f.await?;
246+
let (_, message) = ea_subscription.read_ignoring_rollbacks().await?;
245247
match message.as_ref() {
246248
Message::Cardano((block_info, CardanoMessage::EpochActivity(ea_msg))) => {
247249
let span = info_span!(
@@ -297,11 +299,18 @@ impl AccountsState {
297299
.await;
298300
}
299301

302+
Message::Cardano((
303+
_,
304+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
305+
)) => {
306+
// Ignore this, we already handled rollbacks
307+
}
308+
300309
_ => error!("Unexpected message type: {certs_message:?}"),
301310
}
302311

303312
// Handle withdrawals
304-
let (_, message) = withdrawals_message_f.await?;
313+
let (_, message) = withdrawals_subscription.read_ignoring_rollbacks().await?;
305314
match message.as_ref() {
306315
Message::Cardano((block_info, CardanoMessage::Withdrawals(withdrawals_msg))) => {
307316
let span = info_span!(
@@ -323,7 +332,7 @@ impl AccountsState {
323332
}
324333

325334
// Handle stake address deltas
326-
let (_, message) = stake_message_f.await?;
335+
let (_, message) = stake_subscription.read_ignoring_rollbacks().await?;
327336
match message.as_ref() {
328337
Message::Cardano((block_info, CardanoMessage::StakeAddressDeltas(deltas_msg))) => {
329338
let span = info_span!(
Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use acropolis_common::caryatid::RollbackAwarePublisher;
12
use acropolis_common::messages::{
23
CardanoMessage, DRepDelegationDistribution, DRepStakeDistributionMessage, Message,
34
};
@@ -6,18 +7,12 @@ use caryatid_sdk::Context;
67
use std::sync::Arc;
78

89
/// Message publisher for DRep Delegation Distribution (DRDD)
9-
pub struct DRepDistributionPublisher {
10-
/// Module context
11-
context: Arc<Context<Message>>,
12-
13-
/// Topic to publish on
14-
topic: String,
15-
}
10+
pub struct DRepDistributionPublisher(RollbackAwarePublisher<Message>);
1611

1712
impl DRepDistributionPublisher {
1813
/// Construct with context and topic to publish on
1914
pub fn new(context: Arc<Context<Message>>, topic: String) -> Self {
20-
Self { context, topic }
15+
Self(RollbackAwarePublisher::new(context, topic))
2116
}
2217

2318
/// Publish the DRep Delegation Distribution
@@ -26,18 +21,19 @@ impl DRepDistributionPublisher {
2621
block: &BlockInfo,
2722
drdd: DRepDelegationDistribution,
2823
) -> anyhow::Result<()> {
29-
self.context
30-
.message_bus
31-
.publish(
32-
&self.topic,
33-
Arc::new(Message::Cardano((
34-
block.clone(),
35-
CardanoMessage::DRepStakeDistribution(DRepStakeDistributionMessage {
36-
epoch: block.epoch,
37-
drdd,
38-
}),
39-
))),
40-
)
24+
self.0
25+
.publish(Arc::new(Message::Cardano((
26+
block.clone(),
27+
CardanoMessage::DRepStakeDistribution(DRepStakeDistributionMessage {
28+
epoch: block.epoch,
29+
drdd,
30+
}),
31+
))))
4132
.await
4233
}
34+
35+
/// Publish a rollback message, if we have anything to roll back
36+
pub async fn publish_rollback(&mut self, message: Arc<Message>) -> anyhow::Result<()> {
37+
self.0.publish(message).await
38+
}
4339
}
Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,17 @@
1+
use acropolis_common::caryatid::RollbackAwarePublisher;
12
use acropolis_common::messages::{CardanoMessage, Message, SPOStakeDistributionMessage};
23
use acropolis_common::{BlockInfo, DelegatedStake, PoolId};
34
use caryatid_sdk::Context;
45
use std::collections::BTreeMap;
56
use std::sync::Arc;
67

78
/// Message publisher for Stake Pool Delegation Distribution (SPDD)
8-
pub struct SPODistributionPublisher {
9-
/// Module context
10-
context: Arc<Context<Message>>,
11-
12-
/// Topic to publish on
13-
topic: String,
14-
}
9+
pub struct SPODistributionPublisher(RollbackAwarePublisher<Message>);
1510

1611
impl SPODistributionPublisher {
1712
/// Construct with context and topic to publish on
1813
pub fn new(context: Arc<Context<Message>>, topic: String) -> Self {
19-
Self { context, topic }
14+
Self(RollbackAwarePublisher::new(context, topic))
2015
}
2116

2217
/// Publish the SPDD
@@ -25,18 +20,19 @@ impl SPODistributionPublisher {
2520
block: &BlockInfo,
2621
spos: BTreeMap<PoolId, DelegatedStake>,
2722
) -> anyhow::Result<()> {
28-
self.context
29-
.message_bus
30-
.publish(
31-
&self.topic,
32-
Arc::new(Message::Cardano((
33-
block.clone(),
34-
CardanoMessage::SPOStakeDistribution(SPOStakeDistributionMessage {
35-
epoch: block.epoch - 1, // End of the previous epoch
36-
spos: spos.into_iter().collect(),
37-
}),
38-
))),
39-
)
23+
self.0
24+
.publish(Arc::new(Message::Cardano((
25+
block.clone(),
26+
CardanoMessage::SPOStakeDistribution(SPOStakeDistributionMessage {
27+
epoch: block.epoch - 1, // End of the previous epoch
28+
spos: spos.into_iter().collect(),
29+
}),
30+
))))
4031
.await
4132
}
33+
34+
/// Publish a rollback message, if we have anything to roll back
35+
pub async fn publish_rollback(&mut self, message: Arc<Message>) -> anyhow::Result<()> {
36+
self.0.publish(message).await
37+
}
4238
}

0 commit comments

Comments
 (0)