Skip to content

Commit a763b47

Browse files
committed
feat: replace Rollback with StateTransition
1 parent 61cd082 commit a763b47

File tree

13 files changed

+81
-29
lines changed

13 files changed

+81
-29
lines changed

common/src/caryatid.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33
use anyhow::Result;
44
use caryatid_sdk::{async_trait, Context, MessageBounds, Subscription};
55

6-
use crate::messages::{CardanoMessage, Message};
6+
use crate::messages::{CardanoMessage, Message, StateTransitionMessage};
77

88
#[async_trait]
99
pub trait SubscriptionExt<M: MessageBounds> {
@@ -17,7 +17,10 @@ impl SubscriptionExt<Message> for Box<dyn Subscription<Message>> {
1717
let (stream, message) = self.read().await?;
1818
if matches!(
1919
message.as_ref(),
20-
Message::Cardano((_, CardanoMessage::Rollback(_)))
20+
Message::Cardano((
21+
_,
22+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_))
23+
))
2124
) {
2225
continue;
2326
}
@@ -49,7 +52,10 @@ impl RollbackAwarePublisher<Message> {
4952

5053
pub async fn publish(&mut self, message: Arc<Message>) -> Result<()> {
5154
match message.as_ref() {
52-
Message::Cardano((block, CardanoMessage::Rollback(_))) => {
55+
Message::Cardano((
56+
block,
57+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
58+
)) => {
5359
if self.last_activity_at.is_some_and(|slot| slot >= block.slot) {
5460
self.last_activity_at = None;
5561
self.context.publish(&self.topic, message).await?;

common/src/messages.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ pub struct RawBlockMessage {
4444

4545
/// Rollback message
4646
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
47-
pub struct RollbackMessage {
48-
/// Point which the chain has rolled back to
49-
pub point: Point,
47+
pub enum StateTransitionMessage {
48+
/// The chain has been rolled back to a specific point
49+
Rollback(Point),
5050
}
5151

5252
/// Snapshot completion message
@@ -307,7 +307,7 @@ pub struct SPOStateMessage {
307307
#[allow(clippy::large_enum_variant)]
308308
pub enum CardanoMessage {
309309
BlockAvailable(RawBlockMessage), // Block body available
310-
Rollback(RollbackMessage), // Chain has been rolled back
310+
StateTransition(StateTransitionMessage), // Our position on the chain has changed
311311
BlockValidation(ValidationStatus), // Result of a block validation
312312
SnapshotComplete, // Mithril snapshot loaded
313313
ReceivedTxs(RawTxsMessage), // Transaction available

modules/accounts_state/src/accounts_state.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
44
use acropolis_common::{
55
caryatid::SubscriptionExt,
6-
messages::{CardanoMessage, Message, StateQuery, StateQueryResponse},
6+
messages::{CardanoMessage, Message, StateQuery, StateQueryResponse, StateTransitionMessage},
77
queries::accounts::{DrepDelegators, PoolDelegators, DEFAULT_ACCOUNTS_QUERY_TOPIC},
88
state_history::{StateHistory, StateHistoryStore},
99
BlockInfo, BlockStatus,
@@ -136,7 +136,10 @@ impl AccountsState {
136136
current_block = Some(block_info.clone());
137137
block_info.new_epoch && block_info.epoch > 0
138138
}
139-
Message::Cardano((_, CardanoMessage::Rollback(_))) => {
139+
Message::Cardano((
140+
_,
141+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
142+
)) => {
140143
drep_publisher.publish_rollback(certs_message.clone()).await?;
141144
spo_publisher.publish_rollback(certs_message.clone()).await?;
142145
spo_rewards_publisher.publish_rollback(certs_message.clone()).await?;
@@ -296,7 +299,10 @@ impl AccountsState {
296299
.await;
297300
}
298301

299-
Message::Cardano((_, CardanoMessage::Rollback(_))) => {
302+
Message::Cardano((
303+
_,
304+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
305+
)) => {
300306
// Ignore this, we already handled rollbacks
301307
}
302308

modules/block_unpacker/src/block_unpacker.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Acropolis Block unpacker module for Caryatid
22
//! Unpacks block bodies into transactions
33
4-
use acropolis_common::messages::{CardanoMessage, Message, RawTxsMessage};
4+
use acropolis_common::messages::{CardanoMessage, Message, RawTxsMessage, StateTransitionMessage};
55
use anyhow::Result;
66
use caryatid_sdk::{module, Context};
77
use config::Config;
@@ -81,7 +81,10 @@ impl BlockUnpacker {
8181
}
8282
}
8383

84-
Message::Cardano((_, CardanoMessage::Rollback(_))) => {
84+
Message::Cardano((
85+
_,
86+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
87+
)) => {
8588
// forward the rollback downstream
8689
context
8790
.message_bus

modules/consensus/src/consensus.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//! Maintains a favoured chain based on offered options from multiple sources
33
44
use acropolis_common::{
5-
messages::{CardanoMessage, Message},
5+
messages::{CardanoMessage, Message, StateTransitionMessage},
66
validation::ValidationStatus,
77
};
88
use anyhow::Result;
@@ -134,7 +134,10 @@ impl Consensus {
134134
.await;
135135
}
136136

137-
Message::Cardano((_, CardanoMessage::Rollback(_))) => {
137+
Message::Cardano((
138+
_,
139+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
140+
)) => {
138141
// Send rollback to all validators and state modules
139142
context
140143
.message_bus

modules/drep_state/src/drep_state.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
//! Accepts certificate events and derives the DRep State in memory
33
44
use acropolis_common::caryatid::SubscriptionExt;
5+
use acropolis_common::messages::StateTransitionMessage;
56
use acropolis_common::queries::errors::QueryError;
67
use acropolis_common::{
78
messages::{CardanoMessage, Message, StateQuery, StateQueryResponse},
@@ -91,7 +92,10 @@ impl DRepState {
9192
current_block = Some(block_info.clone());
9293
block_info.new_epoch && block_info.epoch > 0
9394
}
94-
Message::Cardano((_, CardanoMessage::Rollback(_))) => {
95+
Message::Cardano((
96+
_,
97+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
98+
)) => {
9599
drep_state_publisher.publish_rollback(certs_message.clone()).await?;
96100
false
97101
}
@@ -153,7 +157,10 @@ impl DRepState {
153157
.await;
154158
}
155159

156-
Message::Cardano((_, CardanoMessage::Rollback(_))) => {
160+
Message::Cardano((
161+
_,
162+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
163+
)) => {
157164
// Do nothing, we handled the rollback earlier
158165
}
159166

modules/epochs_state/src/epochs_state.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
44
use acropolis_common::{
55
caryatid::SubscriptionExt,
6-
messages::{CardanoMessage, Message, StateQuery, StateQueryResponse},
6+
messages::{CardanoMessage, Message, StateQuery, StateQueryResponse, StateTransitionMessage},
77
queries::{
88
epochs::{
99
EpochsStateQuery, EpochsStateQueryResponse, LatestEpoch, DEFAULT_EPOCHS_QUERY_TOPIC,
@@ -136,7 +136,10 @@ impl EpochsState {
136136
});
137137
}
138138

139-
Message::Cardano((_, CardanoMessage::Rollback(_))) => {
139+
Message::Cardano((
140+
_,
141+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
142+
)) => {
140143
// publish epoch activity rollback message
141144
epoch_activity_publisher.publish_rollback(message).await.unwrap_or_else(|e| {
142145
error!("Failed to publish epoch activity rollback: {e}")

modules/governance_state/src/governance_state.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
//! Accepts certificate events and derives the Governance State in memory
33
44
use acropolis_common::caryatid::SubscriptionExt;
5+
use acropolis_common::messages::StateTransitionMessage;
56
use acropolis_common::queries::errors::QueryError;
67
use acropolis_common::{
78
messages::{
@@ -223,7 +224,10 @@ impl GovernanceState {
223224
Message::Cardano((blk, CardanoMessage::GovernanceProcedures(msg))) => {
224225
(blk.clone(), msg.clone())
225226
}
226-
Message::Cardano((_, CardanoMessage::Rollback(_))) => {
227+
Message::Cardano((
228+
_,
229+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
230+
)) => {
227231
let mut state = state.lock().await;
228232
state.publish_rollback(message).await?;
229233
continue;

modules/parameters_state/src/parameters_state.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Acropolis Parameter State module for Caryatid
22
//! Accepts certificate events and derives the Governance State in memory
33
4+
use acropolis_common::messages::StateTransitionMessage;
45
use acropolis_common::queries::errors::QueryError;
56
use acropolis_common::{
67
messages::{CardanoMessage, Message, ProtocolParamsMessage, StateQuery, StateQueryResponse},
@@ -145,7 +146,10 @@ impl ParametersState {
145146
.instrument(span)
146147
.await?;
147148
}
148-
Message::Cardano((_, CardanoMessage::Rollback(_))) => {
149+
Message::Cardano((
150+
_,
151+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
152+
)) => {
149153
// forward the rollback downstream
150154
config.context.publish(&config.protocol_parameters_topic, message).await?;
151155
}

modules/spo_state/src/spo_state.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
//! Accepts certificate events and derives the SPO state in memory
33
44
use acropolis_common::caryatid::SubscriptionExt;
5+
use acropolis_common::messages::StateTransitionMessage;
56
use acropolis_common::queries::errors::QueryError;
67
use acropolis_common::{
78
ledger_state::SPOState as LedgerSPOState,
@@ -126,7 +127,10 @@ impl SPOState {
126127
block_info.new_epoch && block_info.epoch > 0
127128
}
128129

129-
Message::Cardano((_, CardanoMessage::Rollback(_))) => {
130+
Message::Cardano((
131+
_,
132+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
133+
)) => {
130134
spo_state_publisher.publish_rollback(certs_message.clone()).await?;
131135
false
132136
}
@@ -205,7 +209,10 @@ impl SPOState {
205209
.await;
206210
}
207211

208-
Message::Cardano((_, CardanoMessage::Rollback(_))) => {
212+
Message::Cardano((
213+
_,
214+
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
215+
)) => {
209216
// Do nothing, we handled rollback earlier
210217
}
211218

0 commit comments

Comments
 (0)