Skip to content

Commit 61cd082

Browse files
committed
fix: remove duplicate code
1 parent 48d0c2a commit 61cd082

File tree

28 files changed

+185
-372
lines changed

28 files changed

+185
-372
lines changed

common/src/caryatid.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use std::sync::Arc;
2+
3+
use anyhow::Result;
4+
use caryatid_sdk::{async_trait, Context, MessageBounds, Subscription};
5+
6+
use crate::messages::{CardanoMessage, Message};
7+
8+
#[async_trait]
9+
pub trait SubscriptionExt<M: MessageBounds> {
10+
async fn read_ignoring_rollbacks(&mut self) -> Result<(String, Arc<M>)>;
11+
}
12+
13+
#[async_trait]
14+
impl SubscriptionExt<Message> for Box<dyn Subscription<Message>> {
15+
async fn read_ignoring_rollbacks(&mut self) -> Result<(String, Arc<Message>)> {
16+
loop {
17+
let (stream, message) = self.read().await?;
18+
if matches!(
19+
message.as_ref(),
20+
Message::Cardano((_, CardanoMessage::Rollback(_)))
21+
) {
22+
continue;
23+
}
24+
break Ok((stream, message));
25+
}
26+
}
27+
}
28+
29+
/// A utility to publish messages, which will only publish rollback messages if some work has been rolled back
30+
pub struct RollbackAwarePublisher<M: MessageBounds> {
31+
/// Module context
32+
context: Arc<Context<M>>,
33+
34+
/// Topic to publish on
35+
topic: String,
36+
37+
// When did we publish our last non-rollback message
38+
last_activity_at: Option<u64>,
39+
}
40+
41+
impl RollbackAwarePublisher<Message> {
42+
pub fn new(context: Arc<Context<Message>>, topic: String) -> Self {
43+
Self {
44+
context,
45+
topic,
46+
last_activity_at: None,
47+
}
48+
}
49+
50+
pub async fn publish(&mut self, message: Arc<Message>) -> Result<()> {
51+
match message.as_ref() {
52+
Message::Cardano((block, CardanoMessage::Rollback(_))) => {
53+
if self.last_activity_at.is_some_and(|slot| slot >= block.slot) {
54+
self.last_activity_at = None;
55+
self.context.publish(&self.topic, message).await?;
56+
}
57+
Ok(())
58+
}
59+
Message::Cardano((block, _)) => {
60+
self.last_activity_at = Some(block.slot);
61+
self.context.publish(&self.topic, message).await
62+
}
63+
_ => self.context.publish(&self.topic, message).await,
64+
}
65+
}
66+
}

common/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
pub mod address;
44
pub mod calculations;
5+
pub mod caryatid;
56
pub mod cbor;
67
pub mod cip19;
78
pub mod commands;
@@ -22,7 +23,6 @@ pub mod serialization;
2223
pub mod snapshot;
2324
pub mod stake_addresses;
2425
pub mod state_history;
25-
pub mod subscription;
2626
pub mod types;
2727
pub mod upstream_cache;
2828
pub mod validation;

common/src/subscription.rs

Lines changed: 0 additions & 27 deletions
This file was deleted.

modules/accounts_state/src/accounts_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
//! Manages stake and reward accounts state
33
44
use acropolis_common::{
5+
caryatid::SubscriptionExt,
56
messages::{CardanoMessage, Message, StateQuery, StateQueryResponse},
67
queries::accounts::{DrepDelegators, PoolDelegators, DEFAULT_ACCOUNTS_QUERY_TOPIC},
78
state_history::{StateHistory, StateHistoryStore},
8-
subscription::SubscriptionExt,
99
BlockInfo, BlockStatus,
1010
};
1111
use anyhow::Result;
Lines changed: 12 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use acropolis_common::caryatid::RollbackAwarePublisher;
12
use acropolis_common::messages::{
23
CardanoMessage, DRepDelegationDistribution, DRepStakeDistributionMessage, Message,
34
};
@@ -6,25 +7,12 @@ use caryatid_sdk::Context;
67
use std::sync::Arc;
78

89
/// Message publisher for DRep Delegation Distribution (DRDD)
9-
pub struct DRepDistributionPublisher {
10-
/// Module context
11-
context: Arc<Context<Message>>,
12-
13-
/// Topic to publish on
14-
topic: String,
15-
16-
// When did we publish our last non-rollback message
17-
last_activity_at: Option<u64>,
18-
}
10+
pub struct DRepDistributionPublisher(RollbackAwarePublisher<Message>);
1911

2012
impl DRepDistributionPublisher {
2113
/// Construct with context and topic to publish on
2214
pub fn new(context: Arc<Context<Message>>, topic: String) -> Self {
23-
Self {
24-
context,
25-
topic,
26-
last_activity_at: None,
27-
}
15+
Self(RollbackAwarePublisher::new(context, topic))
2816
}
2917

3018
/// Publish the DRep Delegation Distribution
@@ -33,31 +21,19 @@ impl DRepDistributionPublisher {
3321
block: &BlockInfo,
3422
drdd: DRepDelegationDistribution,
3523
) -> anyhow::Result<()> {
36-
self.last_activity_at = Some(block.slot);
37-
self.context
38-
.message_bus
39-
.publish(
40-
&self.topic,
41-
Arc::new(Message::Cardano((
42-
block.clone(),
43-
CardanoMessage::DRepStakeDistribution(DRepStakeDistributionMessage {
44-
epoch: block.epoch,
45-
drdd,
46-
}),
47-
))),
48-
)
24+
self.0
25+
.publish(Arc::new(Message::Cardano((
26+
block.clone(),
27+
CardanoMessage::DRepStakeDistribution(DRepStakeDistributionMessage {
28+
epoch: block.epoch,
29+
drdd,
30+
}),
31+
))))
4932
.await
5033
}
5134

5235
/// Publish a rollback message, if we have anything to roll back
5336
pub async fn publish_rollback(&mut self, message: Arc<Message>) -> anyhow::Result<()> {
54-
let Message::Cardano((block_info, CardanoMessage::Rollback(_))) = message.as_ref() else {
55-
return Ok(());
56-
};
57-
if self.last_activity_at.is_none_or(|slot| slot < block_info.slot) {
58-
return Ok(());
59-
}
60-
self.last_activity_at = None;
61-
self.context.message_bus.publish(&self.topic, message).await
37+
self.0.publish(message).await
6238
}
6339
}
Lines changed: 12 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,17 @@
1+
use acropolis_common::caryatid::RollbackAwarePublisher;
12
use acropolis_common::messages::{CardanoMessage, Message, SPOStakeDistributionMessage};
23
use acropolis_common::{BlockInfo, DelegatedStake, PoolId};
34
use caryatid_sdk::Context;
45
use std::collections::BTreeMap;
56
use std::sync::Arc;
67

78
/// Message publisher for Stake Pool Delegation Distribution (SPDD)
8-
pub struct SPODistributionPublisher {
9-
/// Module context
10-
context: Arc<Context<Message>>,
11-
12-
/// Topic to publish on
13-
topic: String,
14-
15-
// When did we publish our last non-rollback message
16-
last_activity_at: Option<u64>,
17-
}
9+
pub struct SPODistributionPublisher(RollbackAwarePublisher<Message>);
1810

1911
impl SPODistributionPublisher {
2012
/// Construct with context and topic to publish on
2113
pub fn new(context: Arc<Context<Message>>, topic: String) -> Self {
22-
Self {
23-
context,
24-
topic,
25-
last_activity_at: None,
26-
}
14+
Self(RollbackAwarePublisher::new(context, topic))
2715
}
2816

2917
/// Publish the SPDD
@@ -32,31 +20,19 @@ impl SPODistributionPublisher {
3220
block: &BlockInfo,
3321
spos: BTreeMap<PoolId, DelegatedStake>,
3422
) -> anyhow::Result<()> {
35-
self.last_activity_at = Some(block.slot);
36-
self.context
37-
.message_bus
38-
.publish(
39-
&self.topic,
40-
Arc::new(Message::Cardano((
41-
block.clone(),
42-
CardanoMessage::SPOStakeDistribution(SPOStakeDistributionMessage {
43-
epoch: block.epoch - 1, // End of the previous epoch
44-
spos: spos.into_iter().collect(),
45-
}),
46-
))),
47-
)
23+
self.0
24+
.publish(Arc::new(Message::Cardano((
25+
block.clone(),
26+
CardanoMessage::SPOStakeDistribution(SPOStakeDistributionMessage {
27+
epoch: block.epoch - 1, // End of the previous epoch
28+
spos: spos.into_iter().collect(),
29+
}),
30+
))))
4831
.await
4932
}
5033

5134
/// Publish a rollback message, if we have anything to roll back
5235
pub async fn publish_rollback(&mut self, message: Arc<Message>) -> anyhow::Result<()> {
53-
let Message::Cardano((block_info, CardanoMessage::Rollback(_))) = message.as_ref() else {
54-
return Ok(());
55-
};
56-
if self.last_activity_at.is_none_or(|slot| slot < block_info.slot) {
57-
return Ok(());
58-
}
59-
self.last_activity_at = None;
60-
self.context.message_bus.publish(&self.topic, message).await
36+
self.0.publish(message).await
6137
}
6238
}
Lines changed: 12 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,16 @@
1+
use acropolis_common::caryatid::RollbackAwarePublisher;
12
use acropolis_common::messages::{CardanoMessage, Message, SPORewardsMessage};
23
use acropolis_common::{BlockInfo, PoolId, SPORewards};
34
use caryatid_sdk::Context;
45
use std::sync::Arc;
56

67
/// Message publisher for Stake Pool Delegation Distribution (SPDD)
7-
pub struct SPORewardsPublisher {
8-
/// Module context
9-
context: Arc<Context<Message>>,
10-
11-
/// Topic to publish on
12-
topic: String,
13-
14-
// When did we publish our last non-rollback message
15-
last_activity_at: Option<u64>,
16-
}
8+
pub struct SPORewardsPublisher(RollbackAwarePublisher<Message>);
179

1810
impl SPORewardsPublisher {
1911
/// Construct with context and topic to publish on
2012
pub fn new(context: Arc<Context<Message>>, topic: String) -> Self {
21-
Self {
22-
context,
23-
topic,
24-
last_activity_at: None,
25-
}
13+
Self(RollbackAwarePublisher::new(context, topic))
2614
}
2715

2816
/// Publish the SPO rewards
@@ -31,31 +19,19 @@ impl SPORewardsPublisher {
3119
block: &BlockInfo,
3220
spo_rewards: Vec<(PoolId, SPORewards)>,
3321
) -> anyhow::Result<()> {
34-
self.last_activity_at = Some(block.slot);
35-
self.context
36-
.message_bus
37-
.publish(
38-
&self.topic,
39-
Arc::new(Message::Cardano((
40-
block.clone(),
41-
CardanoMessage::SPORewards(SPORewardsMessage {
42-
epoch: block.epoch - 1, // End of previous epoch
43-
spos: spo_rewards.into_iter().collect(),
44-
}),
45-
))),
46-
)
22+
self.0
23+
.publish(Arc::new(Message::Cardano((
24+
block.clone(),
25+
CardanoMessage::SPORewards(SPORewardsMessage {
26+
epoch: block.epoch - 1, // End of previous epoch
27+
spos: spo_rewards.into_iter().collect(),
28+
}),
29+
))))
4730
.await
4831
}
4932

5033
/// Publish a rollback message, if we have anything to roll back
5134
pub async fn publish_rollback(&mut self, message: Arc<Message>) -> anyhow::Result<()> {
52-
let Message::Cardano((block_info, CardanoMessage::Rollback(_))) = message.as_ref() else {
53-
return Ok(());
54-
};
55-
if self.last_activity_at.is_none_or(|slot| slot < block_info.slot) {
56-
return Ok(());
57-
}
58-
self.last_activity_at = None;
59-
self.context.message_bus.publish(&self.topic, message).await
35+
self.0.publish(message).await
6036
}
6137
}

0 commit comments

Comments
 (0)