Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ pub struct DRepStakeDistributionMessage {
pub dreps: Vec<(DRepCredential, Lovelace)>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SPOStakeDistributionMessage {
/// Epoch which has ended
pub epoch: u64,

/// SPO stake distribution by operator ID
pub spos: Vec<(KeyHash, Lovelace)>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ProtocolParamsMessage {
pub params: ProtocolParams,
Expand Down Expand Up @@ -192,6 +201,7 @@ pub enum CardanoMessage {

// Stake distribution info
DRepStakeDistribution(DRepStakeDistributionMessage), // Info about drep stake
SPOStakeDistribution(SPOStakeDistributionMessage), // SPO delegation distribution (SPDD)
StakeAddressDeltas(StakeAddressDeltasMessage), // Stake part of address deltas
}

Expand Down
26 changes: 21 additions & 5 deletions modules/accounts_state/src/accounts_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use tracing::{error, info};

mod drep_distribution_publisher;
use drep_distribution_publisher::DRepDistributionPublisher;
mod spo_distribution_publisher;
use spo_distribution_publisher::SPODistributionPublisher;
mod state;
use state::State;

Expand All @@ -29,6 +31,7 @@ const DEFAULT_POT_DELTAS_TOPIC: &str = "cardano.pot.deltas";
const DEFAULT_STAKE_DELTAS_TOPIC: &str = "cardano.stake.deltas";
const DEFAULT_DREP_STATE_TOPIC: &str = "cardano.drep.state";
const DEFAULT_DREP_DISTRIBUTION_TOPIC: &str = "cardano.drep.distribution";
const DEFAULT_SPO_DISTRIBUTION_TOPIC: &str = "cardano.spo.distribution";
const DEFAULT_PROTOCOL_PARAMETERS_TOPIC: &str = "cardano.protocol.parameters";

const DEFAULT_HANDLE_STAKE_TOPIC: &str = "rest.get.stake";
Expand All @@ -48,7 +51,8 @@ impl AccountsState {
/// Async run loop
async fn run(
history: Arc<Mutex<StateHistory<State>>>,
mut publisher: DRepDistributionPublisher,
mut drep_publisher: DRepDistributionPublisher,
mut spo_publisher: SPODistributionPublisher,
mut spos_subscription: Box<dyn Subscription<Message>>,
mut ea_subscription: Box<dyn Subscription<Message>>,
mut certs_subscription: Box<dyn Subscription<Message>>,
Expand Down Expand Up @@ -184,8 +188,7 @@ impl AccountsState {
state.handle_drep_state(&dreps_msg);

let drdd = state.generate_drdd();

if let Err(e) = publisher.publish_stake(block_info, drdd).await {
if let Err(e) = drep_publisher.publish_drdd(block_info, drdd).await {
error!("Error publishing drep voting stake distribution: {e:#}")
}
}
Expand All @@ -211,6 +214,11 @@ impl AccountsState {
.handle_spo_state(spo_msg)
.inspect_err(|e| error!("SPOState handling error: {e:#}"))
.ok();

let spdd = state.generate_spdd();
if let Err(e) = spo_publisher.publish_spdd(block_info, spdd).await {
error!("Error publishing SPO stake distribution: {e:#}")
}
}

_ => error!("Unexpected message type: {message:?}"),
Expand Down Expand Up @@ -316,6 +324,10 @@ impl AccountsState {
.get_string("publish-drep-distribution-topic")
.unwrap_or(DEFAULT_DREP_DISTRIBUTION_TOPIC.to_string());

let spo_distribution_topic = config
.get_string("publish-spo-distribution-topic")
.unwrap_or(DEFAULT_SPO_DISTRIBUTION_TOPIC.to_string());

// REST handler topics
let handle_stake_topic = config
.get_string("handle-stake-topic")
Expand Down Expand Up @@ -462,7 +474,10 @@ impl AccountsState {
}
});

let publisher = DRepDistributionPublisher::new(context.clone(), drep_distribution_topic);
let drep_publisher = DRepDistributionPublisher::new(context.clone(),
drep_distribution_topic);
let spo_publisher = SPODistributionPublisher::new(context.clone(),
spo_distribution_topic);

// Subscribe
let spos_subscription = context.subscribe(&spo_state_topic).await?;
Expand All @@ -478,7 +493,8 @@ impl AccountsState {
context.run(async move {
Self::run(
history,
publisher,
drep_publisher,
spo_publisher,
spos_subscription,
ea_subscription,
certs_subscription,
Expand Down
13 changes: 8 additions & 5 deletions modules/accounts_state/src/drep_distribution_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;

use crate::state::DRepDelegationDistribution;

/// Message publisher for DRep Delegation Distribution (DRDD)
pub struct DRepDistributionPublisher {
/// Module context
context: Arc<Context<Message>>,
Expand All @@ -14,14 +15,16 @@ pub struct DRepDistributionPublisher {
}

impl DRepDistributionPublisher {
/// Construct with context and topic to publish on
pub fn new(context: Arc<Context<Message>>, topic: String) -> Self {
Self { context, topic }
}

pub async fn publish_stake(
/// Publish the DRep Delegation Distribution
pub async fn publish_drdd(
&mut self,
block: &BlockInfo,
s: DRepDelegationDistribution,
drdd: DRepDelegationDistribution,
) -> anyhow::Result<()> {
self.context
.message_bus
Expand All @@ -31,9 +34,9 @@ impl DRepDistributionPublisher {
block.clone(),
CardanoMessage::DRepStakeDistribution(DRepStakeDistributionMessage {
epoch: block.epoch,
abstain: s.abstain,
no_confidence: s.no_confidence,
dreps: s.dreps,
abstain: drdd.abstain,
no_confidence: drdd.no_confidence,
dreps: drdd.dreps,
}),
))),
)
Expand Down
42 changes: 42 additions & 0 deletions modules/accounts_state/src/spo_distribution_publisher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use acropolis_common::messages::{CardanoMessage, SPOStakeDistributionMessage, Message};
use acropolis_common::{KeyHash, BlockInfo};
use caryatid_sdk::Context;
use std::collections::BTreeMap;
use std::sync::Arc;

/// Message publisher for Stake Pool Delegation Distribution (SPDD)
pub struct SPODistributionPublisher {
/// Module context
context: Arc<Context<Message>>,

/// Topic to publish on
topic: String,
}

impl SPODistributionPublisher {
/// Construct with context and topic to publish on
pub fn new(context: Arc<Context<Message>>, topic: String) -> Self {
Self { context, topic }
}

/// Publish the SPDD
pub async fn publish_spdd(
&mut self,
block: &BlockInfo,
spos: BTreeMap<KeyHash, u64>,
) -> anyhow::Result<()> {
self.context
.message_bus
.publish(
&self.topic,
Arc::new(Message::Cardano((
block.clone(),
CardanoMessage::SPOStakeDistribution(SPOStakeDistributionMessage {
epoch: block.epoch,
spos: spos.into_iter().collect(),
}),
))),
)
.await
}
}
2 changes: 1 addition & 1 deletion processes/omnibus/omnibus.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ port = 4340

[module.spy]
# Enable for message spying
#topic = "cardano.drep.state"
#topic = "cardano.spo.distribution"

[startup]
topic = "cardano.sequence.start"
Expand Down