Skip to content

Commit aa3f113

Browse files
authored
feat: kafka partition key and headers (#1821)
1 parent b7776d4 commit aa3f113

File tree

2 files changed

+45
-20
lines changed

2 files changed

+45
-20
lines changed

src/infra/kafka/kafka.rs

+20-17
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use anyhow::Result;
22
use clap::ValueEnum;
3-
use ethereum_types::H256;
3+
use rdkafka::message::Header;
4+
use rdkafka::message::OwnedHeaders;
45
use rdkafka::producer::FutureProducer;
56
use rdkafka::producer::FutureRecord;
67
use rdkafka::ClientConfig;
78

8-
use crate::eth::primitives::Hash;
99
use crate::ledger::events::Event;
10+
use crate::log_and_err;
1011

1112
#[derive(Clone, serde::Serialize, serde::Deserialize)]
1213
pub struct KafkaConfig {
@@ -104,22 +105,24 @@ impl KafkaConnector {
104105
}
105106

106107
pub async fn send_event<T: Event>(&self, event: T) -> Result<()> {
107-
let payload = serde_json::to_string(&event)?;
108+
// prepare base payload
109+
let headers = event.event_headers()?;
110+
let key = event.event_key()?;
111+
let payload = event.event_payload()?;
108112

109-
match self
110-
.producer
111-
.send(
112-
FutureRecord::to(&self.topic).payload(&payload).key(&Hash(H256::random()).to_string()),
113-
std::time::Duration::from_secs(0),
114-
)
115-
.await
116-
{
117-
Ok(_) => {
118-
tracing::info!(payload = payload, "event sent to kafka");
119-
println!("event sent to kafka {:?}", payload);
120-
Ok(())
121-
}
122-
Err(e) => Err(anyhow::anyhow!("failed to send event to kafka: {:?}", e)),
113+
// prepare kafka payload
114+
let mut kafka_headers = OwnedHeaders::new_with_capacity(headers.len());
115+
for (key, value) in headers.iter() {
116+
let header = Header { key, value: Some(value) };
117+
kafka_headers = kafka_headers.insert(header);
123118
}
119+
let kafka_record = FutureRecord::to(&self.topic).payload(&payload).key(&key).headers(kafka_headers);
120+
121+
// publis and handle response
122+
tracing::info!(%key, %payload, ?headers, "publishing kafka event");
123+
if let Err((e, _)) = self.producer.send(kafka_record, std::time::Duration::from_secs(0)).await {
124+
return log_and_err!(reason = e, "failed to publish kafka event");
125+
}
126+
Ok(())
124127
}
125128
}

src/ledger/events.rs

+25-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::borrow::Cow;
2+
use std::collections::HashMap;
23
use std::collections::HashSet;
34

45
use chrono::DateTime;
@@ -200,10 +201,31 @@ impl Serialize for AccountTransferDirection {
200201
// Marker Trait
201202
// -----------------------------------------------------------------------------
202203

203-
/// Marker trait for events
204-
pub trait Event: Serialize {}
204+
/// Struct is an event that can be published to external systems.
205+
pub trait Event: Serialize + Sized {
206+
/// Returns the partition key component of the event.
207+
fn event_key(&self) -> anyhow::Result<String>;
205208

206-
impl Event for AccountTransfers {}
209+
/// Returns the headers component of the event.
210+
///
211+
/// By default, it returns empty headers.
212+
fn event_headers(&self) -> anyhow::Result<HashMap<String, String>> {
213+
Ok(HashMap::default())
214+
}
215+
216+
/// Returns the payload component of the event.
217+
///
218+
/// By default, it serializes the implementing struct as JSON.
219+
fn event_payload(&self) -> anyhow::Result<String> {
220+
Ok(serde_json::to_string(self)?)
221+
}
222+
}
223+
224+
impl Event for AccountTransfers {
225+
fn event_key(&self) -> anyhow::Result<String> {
226+
Ok(self.account_address.to_string())
227+
}
228+
}
207229

208230
// -----------------------------------------------------------------------------
209231
// Conversions

0 commit comments

Comments
 (0)