Skip to content

Commit a16d17f

Browse files
authored
feat: send_buffered function to send events (#1829)
* fix: send_event error handling * enha: handle_delivery_result function
1 parent 3cfa165 commit a16d17f

File tree

3 files changed

+19
-9
lines changed

3 files changed

+19
-9
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ crossbeam-channel = "=0.5.13"
6868
futures = "=0.3.30"
6969
futures-timer = "=3.0.3"
7070
futures-util = "=0.3.31"
71+
futures-channel = "=0.3.31"
7172

7273
# ethereum / rpc
7374
ethabi = "=18.0.0"

src/infra/kafka/kafka.rs

+17-9
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ use anyhow::Result;
22
use clap::Parser;
33
use clap::ValueEnum;
44
use display_json::DebugAsJson;
5+
use futures::Stream;
6+
use futures::StreamExt;
57
use rdkafka::message::Header;
68
use rdkafka::message::OwnedHeaders;
9+
use rdkafka::producer::future_producer::OwnedDeliveryResult;
710
use rdkafka::producer::DeliveryFuture;
811
use rdkafka::producer::FutureProducer;
912
use rdkafka::producer::FutureRecord;
@@ -161,14 +164,19 @@ impl KafkaConnector {
161164
}
162165

163166
pub async fn send_event<T: Event>(&self, event: T) -> Result<()> {
164-
match self.queue_event(event) {
165-
Ok(fut) =>
166-
if let Err(e) = fut.await {
167-
log_and_err!(reason = e, "failed to publish kafka event")
168-
} else {
169-
Ok(())
170-
},
171-
Err(e) => Err(e),
172-
}
167+
handle_delivery_result(self.queue_event(event)?.await)
168+
}
169+
170+
pub fn send_buffered<T: Event>(&self, events: Vec<T>, buffer_size: usize) -> Result<impl Stream<Item = Result<()>>> {
171+
let futures: Vec<DeliveryFuture> = events.into_iter().map(|event| self.queue_event(event)).collect::<Result<Vec<_>, _>>()?; // This could fail because the queue is full
172+
Ok(futures::stream::iter(futures).buffered(buffer_size).map(handle_delivery_result))
173+
}
174+
}
175+
176+
fn handle_delivery_result(res: Result<OwnedDeliveryResult, futures_channel::oneshot::Canceled>) -> Result<()> {
177+
match res {
178+
Err(e) => log_and_err!(reason = e, "failed to publish kafka event"),
179+
Ok(Err((e, _))) => log_and_err!(reason = e, "failed to publish kafka event"),
180+
Ok(_) => Ok(()),
173181
}
174182
}

0 commit comments

Comments
 (0)