Skip to content

Commit

Permalink
feat: cluster header + cluster prefixed keys for account updates (#51)
Browse files Browse the repository at this point in the history
* feat: include cluster and include it in key generation

* chore: extract key + buf account update create + add tests

* publish: including cluster header with account updates

* fix: add missing cluster module

* nit: use cluster method to derive key

* docs: document cluster config property
  • Loading branch information
thlorenz authored Aug 9, 2023
1 parent bebd7a0 commit ecf62db
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 13 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ _Local Environment Config Values_ below.
The below config values are global and apply to all environments. Environment specific settings
are added to the `environments` array.

* **cluster** (`String`)
* The cluster value to include in messages such that Kafka event receivers have context
regarding updates (implemented for account updates only for now).
* possible values `mainnet|devnet|testnet` (default: `mainnet`)
* **shutdown_timeout_ms** (`u64`)
* Time the plugin is given to flush out all messages to Kafka
* and gracefully shutdown upon exit request.
Expand Down Expand Up @@ -117,6 +121,7 @@ are added to the `environments` array.
```json
{
"libpath": "/home/solana/geyser-kafka/target/release/libsolana_accountsdb_plugin_kafka.so",
"cluster": "mainnet",
"shutdown_timeout_ms": 30000,
"update_account_topic": "geyser.mainnet.account_update",
"update_slot_topic": "geyser.mainnet.slot_update",
Expand Down
44 changes: 44 additions & 0 deletions src/cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use std::fmt::Display;

use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub enum Cluster {
#[default]
Mainnet,
Devnet,
Testnet,
Custom(String),
}

impl From<&str> for Cluster {
fn from(value: &str) -> Self {
match value.to_lowercase().as_str() {
"mainnet" => Self::Mainnet,
"devnet" => Self::Devnet,
"testnet" => Self::Testnet,
_ => Self::Custom(value.to_string()),
}
}
}

impl Display for Cluster {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Mainnet => write!(f, "mainnet"),
Self::Devnet => write!(f, "devnet"),
Self::Testnet => write!(f, "testnet"),
Self::Custom(value) => write!(f, "Custom({value})"),
}
}
}

impl Cluster {
/// Derives a key to be used for Kafka events from the provided [cluster] and [program_id].
/// This schema to derive keys is concistently used across all Ironforge services when
/// sending/receiving Kafka events.
pub fn key(&self, program_id: &str) -> String {
format!("{self}:{program_id}")
}
}
6 changes: 5 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::str::FromStr;

use solana_program::pubkey::Pubkey;

use crate::EnvConfig;
use crate::{Cluster, EnvConfig};

use {
crate::PrometheusService,
Expand All @@ -37,6 +37,9 @@ use {
/// Plugin config.
#[derive(Deserialize)]
pub struct Config {
#[serde(default = "Cluster::default")]
pub cluster: Cluster,

/// Time the plugin is given to flush out all messages to Kafka
/// and gracefully shutdown upon exit request.
#[serde(default)]
Expand Down Expand Up @@ -91,6 +94,7 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
Self {
cluster: Cluster::default(),
shutdown_timeout_ms: 30_000,
update_account_topic: Default::default(),
update_account_topic_overrides: Default::default(),
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPlugin;

mod allowlist;
mod cluster;
mod config;
mod env_config;
mod errors;
Expand All @@ -30,6 +31,7 @@ mod publisher;
mod version;

pub use {
cluster::Cluster,
config::{Config, Producer},
env_config::EnvConfig,
errors::*,
Expand Down
173 changes: 161 additions & 12 deletions src/publisher/kafka_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@

use std::collections::HashMap;

use rdkafka::message::{Header, OwnedHeaders};
use solana_program::pubkey::Pubkey;

use {
crate::{
message_wrapper::EventMessage::{self, Account, Slot, Transaction},
prom::{
StatsThreadedProducerContext, UPLOAD_ACCOUNTS_TOTAL, UPLOAD_SLOTS_TOTAL,
UPLOAD_TRANSACTIONS_TOTAL,
},
Config, MessageWrapper, SlotStatusEvent, TransactionEvent, UpdateAccountEvent,
Cluster, Config, MessageWrapper, SlotStatusEvent, TransactionEvent, UpdateAccountEvent,
},
log::error,
prost::Message,
Expand All @@ -35,6 +38,7 @@ use {
pub struct KafkaPublisher {
pub(crate) env: String,
producer: ThreadedProducer<StatsThreadedProducerContext>,
cluster: Cluster,
shutdown_timeout: Duration,

update_account_topic: String,
Expand All @@ -53,6 +57,7 @@ impl KafkaPublisher {
) -> Self {
Self {
env,
cluster: config.cluster.clone(),
producer,
shutdown_timeout: Duration::from_millis(config.shutdown_timeout_ms),
update_account_topic: config.update_account_topic.clone(),
Expand All @@ -69,14 +74,11 @@ impl KafkaPublisher {
.get(&ev.owner)
.unwrap_or(&self.update_account_topic);

let temp_key;
let (key, buf) = if self.wrap_messages {
temp_key = self.copy_and_prepend(ev.owner.as_slice(), 65u8);
(&temp_key, Self::encode_with_wrapper(Account(Box::new(ev))))
} else {
(&ev.owner, ev.encode_to_vec())
};
let record = BaseRecord::<Vec<u8>, _>::to(topic).key(key).payload(&buf);
let (key, buf) = Self::account_update_key_and_data(ev, &self.cluster, self.wrap_messages);
let record = BaseRecord::<Vec<u8>, _>::to(topic)
.key(&key)
.headers(Self::headers(&self.cluster))
.payload(&buf);
let result = self.producer.send(record).map(|_| ()).map_err(|(e, _)| e);
UPLOAD_ACCOUNTS_TOTAL
.with_label_values(&[if result.is_ok() { "success" } else { "failed" }])
Expand All @@ -87,7 +89,7 @@ impl KafkaPublisher {
pub fn update_slot_status(&self, ev: SlotStatusEvent) -> Result<(), KafkaError> {
let temp_key;
let (key, buf) = if self.wrap_messages {
temp_key = self.copy_and_prepend(&ev.slot.to_le_bytes(), 83u8);
temp_key = Self::copy_and_prepend(&ev.slot.to_le_bytes(), 83u8);
(&temp_key, Self::encode_with_wrapper(Slot(Box::new(ev))))
} else {
temp_key = ev.slot.to_le_bytes().to_vec();
Expand All @@ -106,7 +108,7 @@ impl KafkaPublisher {
pub fn update_transaction(&self, ev: TransactionEvent) -> Result<(), KafkaError> {
let temp_key;
let (key, buf) = if self.wrap_messages {
temp_key = self.copy_and_prepend(ev.signature.as_slice(), 84u8);
temp_key = Self::copy_and_prepend(ev.signature.as_slice(), 84u8);
(
&temp_key,
Self::encode_with_wrapper(Transaction(Box::new(ev))),
Expand Down Expand Up @@ -143,12 +145,51 @@ impl KafkaPublisher {
.encode_to_vec()
}

fn copy_and_prepend(&self, data: &[u8], prefix: u8) -> Vec<u8> {
// -----------------
// Account Update
// -----------------
fn account_update_key_and_data(
ev: UpdateAccountEvent,
cluster: &Cluster,
wrap_messages: bool,
) -> (Vec<u8>, Vec<u8>) {
if wrap_messages {
let key = Self::account_update_key(cluster, &ev.owner);
let key = Self::copy_and_prepend(key.as_bytes(), 65u8);
let data = Self::encode_with_wrapper(Account(Box::new(ev)));
(key, data)
} else {
let key = Self::account_update_key(cluster, &ev.owner);
let key = key.as_bytes().to_vec();
let data = ev.encode_to_vec();
(key, data)
}
}

fn copy_and_prepend(data: &[u8], prefix: u8) -> Vec<u8> {
let mut temp_key = Vec::with_capacity(data.len() + 1);
temp_key.push(prefix);
temp_key.extend_from_slice(data);
temp_key
}

fn account_update_key(cluster: &Cluster, owner: &[u8]) -> String {
// SAFETY: we don't expect the RPC to provide us invalid pubkeys ever
cluster.key(&Pubkey::try_from(owner).unwrap().to_string())
}

// -----------------
// Headers
// -----------------
fn headers(cluster: &Cluster) -> OwnedHeaders {
let headers = OwnedHeaders::new();
let cluster = cluster.to_string();
let cluster_header = Header {
key: "cluster",
value: Some(cluster.as_bytes()),
};
headers.insert(cluster_header)
}
}

impl Drop for KafkaPublisher {
Expand All @@ -158,3 +199,111 @@ impl Drop for KafkaPublisher {
}
}
}

#[cfg(test)]
mod tests {
use std::str::FromStr;

use rdkafka::message::Headers;

use super::*;

const PK: &str = "A15Y2eoMNGeX4516TYTaaMErwabCrf9AB9mrzFohdQJz";
fn event_with_owner(owner: &Pubkey) -> UpdateAccountEvent {
UpdateAccountEvent {
slot: 9,
pubkey: Pubkey::new_unique().to_bytes().to_vec(),
lamports: 100,
owner: owner.to_bytes().to_vec(),
executable: false,
rent_epoch: 0,
data: "account data".as_bytes().to_vec(),
write_version: 1,
txn_signature: None,
}
}

#[test]
fn account_update_key_and_data_no_wrap() {
fn check(cluster: Cluster, expected_key: &str) {
let owner = Pubkey::from_str(PK).unwrap();
let ev = event_with_owner(&owner);
let (key, data) =
KafkaPublisher::account_update_key_and_data(ev.clone(), &cluster, false);
let key = String::from_utf8_lossy(key.as_slice());

let mut bytes = data.as_slice();
let decoded = UpdateAccountEvent::decode(&mut bytes).unwrap();

assert_eq!(key, expected_key);
assert_eq!(decoded, ev);
}

check(
Cluster::Mainnet,
"mainnet:A15Y2eoMNGeX4516TYTaaMErwabCrf9AB9mrzFohdQJz",
);
check(
Cluster::Devnet,
"devnet:A15Y2eoMNGeX4516TYTaaMErwabCrf9AB9mrzFohdQJz",
);
check(
Cluster::Testnet,
"testnet:A15Y2eoMNGeX4516TYTaaMErwabCrf9AB9mrzFohdQJz",
);
}

#[test]
fn account_update_key_and_data_wrap() {
fn check(cluster: Cluster, expected_key: &str) {
let owner = Pubkey::from_str(PK).unwrap();
let ev = event_with_owner(&owner);
let wrapped = MessageWrapper {
event_message: Some(EventMessage::Account(Box::new(ev.clone()))),
};

let (key, data) = KafkaPublisher::account_update_key_and_data(ev, &cluster, true);

assert_eq!(key[0], 65u8);
let key = key.into_iter().skip(1).collect::<Vec<_>>();
let key = String::from_utf8_lossy(key.as_slice());

let mut bytes = data.as_slice();
let decoded = MessageWrapper::decode(&mut bytes).unwrap();

assert_eq!(key, expected_key);
assert_eq!(decoded, wrapped);
}

check(
Cluster::Mainnet,
"mainnet:A15Y2eoMNGeX4516TYTaaMErwabCrf9AB9mrzFohdQJz",
);
check(
Cluster::Devnet,
"devnet:A15Y2eoMNGeX4516TYTaaMErwabCrf9AB9mrzFohdQJz",
);
check(
Cluster::Testnet,
"testnet:A15Y2eoMNGeX4516TYTaaMErwabCrf9AB9mrzFohdQJz",
);
}

#[test]
fn headers_devnet() {
let headers = KafkaPublisher::headers(&Cluster::Devnet);
assert_eq!(headers.count(), 1);
let cluster_header = headers.get(0);
assert_eq!(cluster_header.key, "cluster");
assert_eq!(cluster_header.value.unwrap(), b"devnet");
}

#[test]
fn headers_mainnet() {
let headers = KafkaPublisher::headers(&Cluster::Mainnet);
assert_eq!(headers.count(), 1);
let cluster_header = headers.get(0);
assert_eq!(cluster_header.key, "cluster");
assert_eq!(cluster_header.value.unwrap(), b"mainnet");
}
}

0 comments on commit ecf62db

Please # to comment.