diff --git a/README.md b/README.md index 9313fba..851c85c 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,10 @@ _Local Environment Config Values_ below. The below config values are global and apply to all environments. Environment specific settings are added to the `environments` array. +* **cluster** (`String`) + * The cluster value to include in messages such that Kafka event receivers have context + regarding updates (implemented for account updates only for now). + * possible values `mainnet|devnet|testnet` (default: `mainnet`) * **shutdown_timeout_ms** (`u64`) * Time the plugin is given to flush out all messages to Kafka * and gracefully shutdown upon exit request. @@ -117,6 +121,7 @@ are added to the `environments` array. ```json { "libpath": "/home/solana/geyser-kafka/target/release/libsolana_accountsdb_plugin_kafka.so", + "cluster": "mainnet", "shutdown_timeout_ms": 30000, "update_account_topic": "geyser.mainnet.account_update", "update_slot_topic": "geyser.mainnet.slot_update", diff --git a/src/cluster.rs b/src/cluster.rs new file mode 100644 index 0000000..6ac4b08 --- /dev/null +++ b/src/cluster.rs @@ -0,0 +1,44 @@ +use std::fmt::Display; + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub enum Cluster { + #[default] + Mainnet, + Devnet, + Testnet, + Custom(String), +} + +impl From<&str> for Cluster { + fn from(value: &str) -> Self { + match value.to_lowercase().as_str() { + "mainnet" => Self::Mainnet, + "devnet" => Self::Devnet, + "testnet" => Self::Testnet, + _ => Self::Custom(value.to_string()), + } + } +} + +impl Display for Cluster { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Mainnet => write!(f, "mainnet"), + Self::Devnet => write!(f, "devnet"), + Self::Testnet => write!(f, "testnet"), + Self::Custom(value) => write!(f, "Custom({value})"), + } + } +} + +impl Cluster { + /// Derives a key to be used for Kafka events from the provided [cluster] and [program_id]. + /// This schema to derive keys is concistently used across all Ironforge services when + /// sending/receiving Kafka events. + pub fn key(&self, program_id: &str) -> String { + format!("{self}:{program_id}") + } +} diff --git a/src/config.rs b/src/config.rs index 86b5a53..9018a0a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -16,7 +16,7 @@ use std::str::FromStr; use solana_program::pubkey::Pubkey; -use crate::EnvConfig; +use crate::{Cluster, EnvConfig}; use { crate::PrometheusService, @@ -37,6 +37,9 @@ use { /// Plugin config. #[derive(Deserialize)] pub struct Config { + #[serde(default = "Cluster::default")] + pub cluster: Cluster, + /// Time the plugin is given to flush out all messages to Kafka /// and gracefully shutdown upon exit request. #[serde(default)] @@ -91,6 +94,7 @@ pub struct Config { impl Default for Config { fn default() -> Self { Self { + cluster: Cluster::default(), shutdown_timeout_ms: 30_000, update_account_topic: Default::default(), update_account_topic_overrides: Default::default(), diff --git a/src/lib.rs b/src/lib.rs index 8729a26..0c01f4e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,7 @@ use solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPlugin; mod allowlist; +mod cluster; mod config; mod env_config; mod errors; @@ -30,6 +31,7 @@ mod publisher; mod version; pub use { + cluster::Cluster, config::{Config, Producer}, env_config::EnvConfig, errors::*, diff --git a/src/publisher/kafka_publisher.rs b/src/publisher/kafka_publisher.rs index ad6f830..188ee8e 100644 --- a/src/publisher/kafka_publisher.rs +++ b/src/publisher/kafka_publisher.rs @@ -14,6 +14,9 @@ use std::collections::HashMap; +use rdkafka::message::{Header, OwnedHeaders}; +use solana_program::pubkey::Pubkey; + use { crate::{ message_wrapper::EventMessage::{self, Account, Slot, Transaction}, @@ -21,7 +24,7 @@ use { StatsThreadedProducerContext, UPLOAD_ACCOUNTS_TOTAL, UPLOAD_SLOTS_TOTAL, UPLOAD_TRANSACTIONS_TOTAL, }, - Config, MessageWrapper, SlotStatusEvent, TransactionEvent, UpdateAccountEvent, + Cluster, Config, MessageWrapper, SlotStatusEvent, TransactionEvent, UpdateAccountEvent, }, log::error, prost::Message, @@ -35,6 +38,7 @@ use { pub struct KafkaPublisher { pub(crate) env: String, producer: ThreadedProducer, + cluster: Cluster, shutdown_timeout: Duration, update_account_topic: String, @@ -53,6 +57,7 @@ impl KafkaPublisher { ) -> Self { Self { env, + cluster: config.cluster.clone(), producer, shutdown_timeout: Duration::from_millis(config.shutdown_timeout_ms), update_account_topic: config.update_account_topic.clone(), @@ -69,14 +74,11 @@ impl KafkaPublisher { .get(&ev.owner) .unwrap_or(&self.update_account_topic); - let temp_key; - let (key, buf) = if self.wrap_messages { - temp_key = self.copy_and_prepend(ev.owner.as_slice(), 65u8); - (&temp_key, Self::encode_with_wrapper(Account(Box::new(ev)))) - } else { - (&ev.owner, ev.encode_to_vec()) - }; - let record = BaseRecord::, _>::to(topic).key(key).payload(&buf); + let (key, buf) = Self::account_update_key_and_data(ev, &self.cluster, self.wrap_messages); + let record = BaseRecord::, _>::to(topic) + .key(&key) + .headers(Self::headers(&self.cluster)) + .payload(&buf); let result = self.producer.send(record).map(|_| ()).map_err(|(e, _)| e); UPLOAD_ACCOUNTS_TOTAL .with_label_values(&[if result.is_ok() { "success" } else { "failed" }]) @@ -87,7 +89,7 @@ impl KafkaPublisher { pub fn update_slot_status(&self, ev: SlotStatusEvent) -> Result<(), KafkaError> { let temp_key; let (key, buf) = if self.wrap_messages { - temp_key = self.copy_and_prepend(&ev.slot.to_le_bytes(), 83u8); + temp_key = Self::copy_and_prepend(&ev.slot.to_le_bytes(), 83u8); (&temp_key, Self::encode_with_wrapper(Slot(Box::new(ev)))) } else { temp_key = ev.slot.to_le_bytes().to_vec(); @@ -106,7 +108,7 @@ impl KafkaPublisher { pub fn update_transaction(&self, ev: TransactionEvent) -> Result<(), KafkaError> { let temp_key; let (key, buf) = if self.wrap_messages { - temp_key = self.copy_and_prepend(ev.signature.as_slice(), 84u8); + temp_key = Self::copy_and_prepend(ev.signature.as_slice(), 84u8); ( &temp_key, Self::encode_with_wrapper(Transaction(Box::new(ev))), @@ -143,12 +145,51 @@ impl KafkaPublisher { .encode_to_vec() } - fn copy_and_prepend(&self, data: &[u8], prefix: u8) -> Vec { + // ----------------- + // Account Update + // ----------------- + fn account_update_key_and_data( + ev: UpdateAccountEvent, + cluster: &Cluster, + wrap_messages: bool, + ) -> (Vec, Vec) { + if wrap_messages { + let key = Self::account_update_key(cluster, &ev.owner); + let key = Self::copy_and_prepend(key.as_bytes(), 65u8); + let data = Self::encode_with_wrapper(Account(Box::new(ev))); + (key, data) + } else { + let key = Self::account_update_key(cluster, &ev.owner); + let key = key.as_bytes().to_vec(); + let data = ev.encode_to_vec(); + (key, data) + } + } + + fn copy_and_prepend(data: &[u8], prefix: u8) -> Vec { let mut temp_key = Vec::with_capacity(data.len() + 1); temp_key.push(prefix); temp_key.extend_from_slice(data); temp_key } + + fn account_update_key(cluster: &Cluster, owner: &[u8]) -> String { + // SAFETY: we don't expect the RPC to provide us invalid pubkeys ever + cluster.key(&Pubkey::try_from(owner).unwrap().to_string()) + } + + // ----------------- + // Headers + // ----------------- + fn headers(cluster: &Cluster) -> OwnedHeaders { + let headers = OwnedHeaders::new(); + let cluster = cluster.to_string(); + let cluster_header = Header { + key: "cluster", + value: Some(cluster.as_bytes()), + }; + headers.insert(cluster_header) + } } impl Drop for KafkaPublisher { @@ -158,3 +199,111 @@ impl Drop for KafkaPublisher { } } } + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use rdkafka::message::Headers; + + use super::*; + + const PK: &str = "A15Y2eoMNGeX4516TYTaaMErwabCrf9AB9mrzFohdQJz"; + fn event_with_owner(owner: &Pubkey) -> UpdateAccountEvent { + UpdateAccountEvent { + slot: 9, + pubkey: Pubkey::new_unique().to_bytes().to_vec(), + lamports: 100, + owner: owner.to_bytes().to_vec(), + executable: false, + rent_epoch: 0, + data: "account data".as_bytes().to_vec(), + write_version: 1, + txn_signature: None, + } + } + + #[test] + fn account_update_key_and_data_no_wrap() { + fn check(cluster: Cluster, expected_key: &str) { + let owner = Pubkey::from_str(PK).unwrap(); + let ev = event_with_owner(&owner); + let (key, data) = + KafkaPublisher::account_update_key_and_data(ev.clone(), &cluster, false); + let key = String::from_utf8_lossy(key.as_slice()); + + let mut bytes = data.as_slice(); + let decoded = UpdateAccountEvent::decode(&mut bytes).unwrap(); + + assert_eq!(key, expected_key); + assert_eq!(decoded, ev); + } + + check( + Cluster::Mainnet, + "mainnet:A15Y2eoMNGeX4516TYTaaMErwabCrf9AB9mrzFohdQJz", + ); + check( + Cluster::Devnet, + "devnet:A15Y2eoMNGeX4516TYTaaMErwabCrf9AB9mrzFohdQJz", + ); + check( + Cluster::Testnet, + "testnet:A15Y2eoMNGeX4516TYTaaMErwabCrf9AB9mrzFohdQJz", + ); + } + + #[test] + fn account_update_key_and_data_wrap() { + fn check(cluster: Cluster, expected_key: &str) { + let owner = Pubkey::from_str(PK).unwrap(); + let ev = event_with_owner(&owner); + let wrapped = MessageWrapper { + event_message: Some(EventMessage::Account(Box::new(ev.clone()))), + }; + + let (key, data) = KafkaPublisher::account_update_key_and_data(ev, &cluster, true); + + assert_eq!(key[0], 65u8); + let key = key.into_iter().skip(1).collect::>(); + let key = String::from_utf8_lossy(key.as_slice()); + + let mut bytes = data.as_slice(); + let decoded = MessageWrapper::decode(&mut bytes).unwrap(); + + assert_eq!(key, expected_key); + assert_eq!(decoded, wrapped); + } + + check( + Cluster::Mainnet, + "mainnet:A15Y2eoMNGeX4516TYTaaMErwabCrf9AB9mrzFohdQJz", + ); + check( + Cluster::Devnet, + "devnet:A15Y2eoMNGeX4516TYTaaMErwabCrf9AB9mrzFohdQJz", + ); + check( + Cluster::Testnet, + "testnet:A15Y2eoMNGeX4516TYTaaMErwabCrf9AB9mrzFohdQJz", + ); + } + + #[test] + fn headers_devnet() { + let headers = KafkaPublisher::headers(&Cluster::Devnet); + assert_eq!(headers.count(), 1); + let cluster_header = headers.get(0); + assert_eq!(cluster_header.key, "cluster"); + assert_eq!(cluster_header.value.unwrap(), b"devnet"); + } + + #[test] + fn headers_mainnet() { + let headers = KafkaPublisher::headers(&Cluster::Mainnet); + assert_eq!(headers.count(), 1); + let cluster_header = headers.get(0); + assert_eq!(cluster_header.key, "cluster"); + assert_eq!(cluster_header.value.unwrap(), b"mainnet"); + } +}