Skip to content

Commit

Permalink
feat: Add back blocking send method based on non-blocking version (#311)
Browse files Browse the repository at this point in the history
  • Loading branch information
cirias authored May 15, 2024
1 parent 4f243f6 commit ed4914a
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 12 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
pulsar-version: [ 2.10.4.3, 2.11.1.2, 3.0.0.1 ]
pulsar-version: [ 2.10.4, 2.11.2, 3.0.4, 3.1.3 ]
steps:
- name: Start Pulsar Standalone Container
run: docker run --name pulsar -p 6650:6650 -p 8080:8080 -d -e GITHUB_ACTIONS=true -e CI=true streamnative/pulsar:${{ matrix.pulsar-version }} /pulsar/bin/pulsar standalone
run: docker run --name pulsar -p 6650:6650 -p 8080:8080 -d -e GITHUB_ACTIONS=true -e CI=true apachepulsar/pulsar:${{ matrix.pulsar-version }} bin/pulsar standalone
- uses: actions/checkout@v3
- uses: Swatinem/rust-cache@v2
- name: Run tests
Expand Down
2 changes: 1 addition & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ async fn run_producer<Exe: Executor>(
resolver,
}) = messages.next().await
{
match producer.send(topic, payload).await {
match producer.send_non_blocking(topic, payload).await {
Ok(send_f) => {
let delay_f = client
.executor
Expand Down
1 change: 1 addition & 0 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,7 @@ mod tests {
let consumer_created = rx.await.unwrap();
assert!(consumer_created);

#[allow(deprecated)]
producer
.send(TestData {
age: 30,
Expand Down
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ mod tests {
data: "data".to_string(),
id,
};
sends.push(producer.send(&message).await.unwrap());
sends.push(producer.send_non_blocking(&message).await.unwrap());
}
try_join_all(sends).await.unwrap();

Expand Down Expand Up @@ -536,13 +536,13 @@ mod tests {

let mut send_receipts = Vec::new();
for i in 0..4 {
send_receipts.push(producer.send(i.to_string()).await.unwrap());
send_receipts.push(producer.send_non_blocking(i.to_string()).await.unwrap());
}
assert!(timeout(Duration::from_millis(100), consumer.next())
.await
.is_err());

send_receipts.push(producer.send(5.to_string()).await.unwrap());
send_receipts.push(producer.send_non_blocking(5.to_string()).await.unwrap());

timeout(Duration::from_millis(100), try_join_all(send_receipts))
.await
Expand All @@ -565,7 +565,7 @@ mod tests {
assert_eq!(count, 5);
let mut send_receipts = Vec::new();
for i in 5..9 {
send_receipts.push(producer.send(i.to_string()).await.unwrap());
send_receipts.push(producer.send_non_blocking(i.to_string()).await.unwrap());
}
producer.send_batch().await.unwrap();
timeout(Duration::from_millis(100), try_join_all(send_receipts))
Expand Down
91 changes: 86 additions & 5 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,24 @@ impl<Exe: Executor> MultiTopicProducer<Exe> {

/// sends one message on a topic
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
#[deprecated = "instead use send_non_blocking"]
pub async fn send<T: SerializeMessage + Sized, S: Into<String>>(
&mut self,
topic: S,
message: T,
) -> Result<SendFuture, Error> {
let fut = self.send_non_blocking(topic, message).await?;
let (tx, rx) = oneshot::channel();
let _ = tx.send(fut.await);
Ok(SendFuture(rx))
}

/// sends one message on a topic
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn send_non_blocking<T: SerializeMessage + Sized, S: Into<String>>(
&mut self,
topic: S,
message: T,
) -> Result<SendFuture, Error> {
let message = T::serialize_message(message)?;
let topic = topic.into();
Expand All @@ -222,16 +236,40 @@ impl<Exe: Executor> MultiTopicProducer<Exe> {
}

let producer = self.producers.get_mut(&topic).unwrap();
producer.send(message).await
producer.send_non_blocking(message).await
}

/// sends a list of messages on a topic
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
#[deprecated = "instead use send_all_non_blocking"]
pub async fn send_all<'a, 'b, T, S, I>(
&mut self,
topic: S,
messages: I,
) -> Result<Vec<SendFuture>, Error>
where
'b: 'a,
T: 'b + SerializeMessage + Sized,
I: IntoIterator<Item = T>,
S: Into<String>,
{
let topic: String = topic.into();
let mut futs = vec![];
for message in messages {
#[allow(deprecated)]
let fut = self.send(&topic, message).await?;
futs.push(fut);
}
Ok(futs)
}

/// sends a list of messages on a topic
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn send_all_non_blocking<'a, 'b, T, S, I>(
&mut self,
topic: S,
messages: I,
) -> Result<Vec<SendFuture>, Error>
where
'b: 'a,
T: 'b + SerializeMessage + Sized,
Expand All @@ -241,7 +279,7 @@ impl<Exe: Executor> MultiTopicProducer<Exe> {
let topic = topic.into();
let mut sends = Vec::new();
for msg in messages {
sends.push(self.send(&topic, msg).await);
sends.push(self.send_non_blocking(&topic, msg).await);
}
// TODO determine whether to keep this approach or go with the partial send, but more mem
// friendly lazy approach. serialize all messages before sending to avoid a partial
Expand Down Expand Up @@ -328,15 +366,15 @@ impl<Exe: Executor> Producer<Exe> {
///
/// ```rust,no_run
/// # async fn run(mut producer: pulsar::Producer<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> {
/// let f1 = producer.send("hello").await?;
/// let f2 = producer.send("world").await?;
/// let f1 = producer.send_non_blocking("hello").await?;
/// let f2 = producer.send_non_blocking("world").await?;
/// let receipt1 = f1.await?;
/// let receipt2 = f2.await?;
/// # Ok(())
/// # }
/// ```
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn send<T: SerializeMessage + Sized>(
pub async fn send_non_blocking<T: SerializeMessage + Sized>(
&mut self,
message: T,
) -> Result<SendFuture, Error> {
Expand All @@ -346,6 +384,39 @@ impl<Exe: Executor> Producer<Exe> {
}
}

/// Sends a message
///
/// this function is similar to send_non_blocking then waits the returned `SendFuture`
/// for the receipt.
///
/// It returns the returned receipt in another `SendFuture` to be backward compatible.
///
/// It is deprecated, and users should instread use send_non_blocking. Users should await the
/// returned `SendFuture` if blocking is needed.
///
/// Usage:
///
/// ```rust,no_run
/// # async fn run(mut producer: pulsar::Producer<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> {
/// let f1 = producer.send("hello").await?;
/// let f2 = producer.send("world").await?;
/// let receipt1 = f1.await?;
/// let receipt2 = f2.await?;
/// # Ok(())
/// # }
/// ```
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
#[deprecated = "instead use send_non_blocking"]
pub async fn send<T: SerializeMessage + Sized>(
&mut self,
message: T,
) -> Result<SendFuture, Error> {
let fut = self.send_non_blocking(message).await?;
let (tx, rx) = oneshot::channel();
let _ = tx.send(fut.await);
Ok(SendFuture(rx))
}

/// sends a list of messages
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn send_all<T, I>(&mut self, messages: I) -> Result<Vec<SendFuture>, Error>
Expand Down Expand Up @@ -1110,7 +1181,17 @@ impl<'a, T, Exe: Executor> MessageBuilder<'a, T, Exe> {
impl<'a, T: SerializeMessage + Sized, Exe: Executor> MessageBuilder<'a, T, Exe> {
/// sends the message through the producer that created it
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
#[deprecated = "instead use send_non_blocking"]
pub async fn send(self) -> Result<SendFuture, Error> {
let fut = self.send_non_blocking().await?;
let (tx, rx) = oneshot::channel();
let _ = tx.send(fut.await);
Ok(SendFuture(rx))
}

/// sends the message through the producer that created it
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn send_non_blocking(self) -> Result<SendFuture, Error> {
let MessageBuilder {
producer,
properties,
Expand Down

0 comments on commit ed4914a

Please # to comment.