From 8bf6b3acbbfcec426d152c9919ff9f296207053c Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Wed, 2 Jul 2025 10:58:23 +0100 Subject: [PATCH 1/2] Publish SPDD from AccountsState Fixes #56 --- common/src/messages.rs | 10 +++++ modules/accounts_state/src/accounts_state.rs | 26 +++++++++--- .../src/drep_distribution_publisher.rs | 13 +++--- .../src/spo_distribution_publisher.rs | 42 +++++++++++++++++++ processes/omnibus/omnibus.toml | 2 +- 5 files changed, 82 insertions(+), 11 deletions(-) create mode 100644 modules/accounts_state/src/spo_distribution_publisher.rs diff --git a/common/src/messages.rs b/common/src/messages.rs index dbde997d..11527d09 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -141,6 +141,15 @@ pub struct DRepStakeDistributionMessage { pub dreps: Vec<(DRepCredential, Lovelace)>, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct SPOStakeDistributionMessage { + /// Epoch which has ended + pub epoch: u64, + + /// SPO stake distribution by operator ID + pub spos: Vec<(KeyHash, Lovelace)>, +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct ProtocolParamsMessage { pub params: ProtocolParams, @@ -192,6 +201,7 @@ pub enum CardanoMessage { // Stake distribution info DRepStakeDistribution(DRepStakeDistributionMessage), // Info about drep stake + SPOStakeDistribution(SPOStakeDistributionMessage), // SPO delegation distribution (SPDD) StakeAddressDeltas(StakeAddressDeltasMessage), // Stake part of address deltas } diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 1535f2e8..e8e5d0d5 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -18,6 +18,8 @@ use tracing::{error, info}; mod drep_distribution_publisher; use drep_distribution_publisher::DRepDistributionPublisher; +mod spo_distribution_publisher; +use spo_distribution_publisher::SPODistributionPublisher; mod state; use state::State; @@ -29,6 +31,7 @@ const DEFAULT_POT_DELTAS_TOPIC: &str = "cardano.pot.deltas"; const DEFAULT_STAKE_DELTAS_TOPIC: &str = "cardano.stake.deltas"; const DEFAULT_DREP_STATE_TOPIC: &str = "cardano.drep.state"; const DEFAULT_DREP_DISTRIBUTION_TOPIC: &str = "cardano.drep.distribution"; +const DEFAULT_SPO_DISTRIBUTION_TOPIC: &str = "cardano.spo.distribution"; const DEFAULT_PROTOCOL_PARAMETERS_TOPIC: &str = "cardano.protocol.parameters"; const DEFAULT_HANDLE_STAKE_TOPIC: &str = "rest.get.stake"; @@ -48,7 +51,8 @@ impl AccountsState { /// Async run loop async fn run( history: Arc>>, - mut publisher: DRepDistributionPublisher, + mut drep_publisher: DRepDistributionPublisher, + mut spo_publisher: SPODistributionPublisher, mut spos_subscription: Box>, mut ea_subscription: Box>, mut certs_subscription: Box>, @@ -184,8 +188,7 @@ impl AccountsState { state.handle_drep_state(&dreps_msg); let drdd = state.generate_drdd(); - - if let Err(e) = publisher.publish_stake(block_info, drdd).await { + if let Err(e) = drep_publisher.publish_drdd(block_info, drdd).await { error!("Error publishing drep voting stake distribution: {e:#}") } } @@ -211,6 +214,11 @@ impl AccountsState { .handle_spo_state(spo_msg) .inspect_err(|e| error!("SPOState handling error: {e:#}")) .ok(); + + let spdd = state.generate_spdd(); + if let Err(e) = spo_publisher.publish_spdd(block_info, spdd).await { + error!("Error publishing SPO stake distribution: {e:#}") + } } _ => error!("Unexpected message type: {message:?}"), @@ -316,6 +324,10 @@ impl AccountsState { .get_string("publish-drep-distribution-topic") .unwrap_or(DEFAULT_DREP_DISTRIBUTION_TOPIC.to_string()); + let spo_distribution_topic = config + .get_string("publish-spo-distribution-topic") + .unwrap_or(DEFAULT_SPO_DISTRIBUTION_TOPIC.to_string()); + // REST handler topics let handle_stake_topic = config .get_string("handle-stake-topic") @@ -462,7 +474,10 @@ impl AccountsState { } }); - let publisher = DRepDistributionPublisher::new(context.clone(), drep_distribution_topic); + let drep_publisher = DRepDistributionPublisher::new(context.clone(), + drep_distribution_topic); + let spo_publisher = SPODistributionPublisher::new(context.clone(), + spo_distribution_topic); // Subscribe let spos_subscription = context.subscribe(&spo_state_topic).await?; @@ -478,7 +493,8 @@ impl AccountsState { context.run(async move { Self::run( history, - publisher, + drep_publisher, + spo_publisher, spos_subscription, ea_subscription, certs_subscription, diff --git a/modules/accounts_state/src/drep_distribution_publisher.rs b/modules/accounts_state/src/drep_distribution_publisher.rs index d3a1ad93..db48c6d5 100644 --- a/modules/accounts_state/src/drep_distribution_publisher.rs +++ b/modules/accounts_state/src/drep_distribution_publisher.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use crate::state::DRepDelegationDistribution; +/// Message publisher for DRep Delegation Distribution (DRDD) pub struct DRepDistributionPublisher { /// Module context context: Arc>, @@ -14,14 +15,16 @@ pub struct DRepDistributionPublisher { } impl DRepDistributionPublisher { + /// Construct with context and topic to publish on pub fn new(context: Arc>, topic: String) -> Self { Self { context, topic } } - pub async fn publish_stake( + /// Publish the DRep Delegation Distribution + pub async fn publish_drdd( &mut self, block: &BlockInfo, - s: DRepDelegationDistribution, + drdd: DRepDelegationDistribution, ) -> anyhow::Result<()> { self.context .message_bus @@ -31,9 +34,9 @@ impl DRepDistributionPublisher { block.clone(), CardanoMessage::DRepStakeDistribution(DRepStakeDistributionMessage { epoch: block.epoch, - abstain: s.abstain, - no_confidence: s.no_confidence, - dreps: s.dreps, + abstain: drdd.abstain, + no_confidence: drdd.no_confidence, + dreps: drdd.dreps, }), ))), ) diff --git a/modules/accounts_state/src/spo_distribution_publisher.rs b/modules/accounts_state/src/spo_distribution_publisher.rs new file mode 100644 index 00000000..f07615d7 --- /dev/null +++ b/modules/accounts_state/src/spo_distribution_publisher.rs @@ -0,0 +1,42 @@ +use acropolis_common::messages::{CardanoMessage, SPOStakeDistributionMessage, Message}; +use acropolis_common::{KeyHash, BlockInfo}; +use caryatid_sdk::Context; +use std::collections::BTreeMap; +use std::sync::Arc; + +/// Message publisher for Stake Pool Delegation Distribution (SPDD) +pub struct SPODistributionPublisher { + /// Module context + context: Arc>, + + /// Topic to publish on + topic: String, +} + +impl SPODistributionPublisher { + /// Construct with context and topic to publish on + pub fn new(context: Arc>, topic: String) -> Self { + Self { context, topic } + } + + /// Publish the SPDD + pub async fn publish_spdd( + &mut self, + block: &BlockInfo, + spos: BTreeMap, + ) -> anyhow::Result<()> { + self.context + .message_bus + .publish( + &self.topic, + Arc::new(Message::Cardano(( + block.clone(), + CardanoMessage::SPOStakeDistribution(SPOStakeDistributionMessage { + epoch: block.epoch, + spos: spos.into_iter().collect(), + }), + ))), + ) + .await + } +} diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 61421f37..70dcf595 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -49,7 +49,7 @@ port = 4340 [module.spy] # Enable for message spying -#topic = "cardano.drep.state" +topic = "cardano.spo.distribution" [startup] topic = "cardano.sequence.start" From 64280e7335bec49a72deaaea70812c0e75fa38eb Mon Sep 17 00:00:00 2001 From: Paul Clark Date: Wed, 2 Jul 2025 11:12:41 +0100 Subject: [PATCH 2/2] Disable spy on SPDD messages --- processes/omnibus/omnibus.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 70dcf595..4ceaf39b 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -49,7 +49,7 @@ port = 4340 [module.spy] # Enable for message spying -topic = "cardano.spo.distribution" +#topic = "cardano.spo.distribution" [startup] topic = "cardano.sequence.start"