Skip to content

Commit

Permalink
feature: Pulsar pool messages support in failure handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
michalcukierman committed Apr 15, 2024
1 parent b38a33b commit 02538ac
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public PulsarContinue(String channel) {
public Uni<Void> handle(PulsarIncomingMessage<?> message, Throwable reason, Metadata metadata) {
log.messageFailureContinued(channel, reason.getMessage());
log.messageFailureFullCause(reason);
message.unwrap().release();
return Uni.createFrom().voidItem()
.emitOn(message::runOnMessageContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public Uni<Void> handle(PulsarIncomingMessage<?> message, Throwable reason, Meta
// We don't commit, we just fail and stop the client.
log.messageFailureFailStop(channel);
reportFailure.accept(reason, true);
message.unwrap().release();
return Uni.createFrom().<Void> failure(reason)
.emitOn(message::runOnMessageContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public PulsarIgnore(String channel) {
public Uni<Void> handle(PulsarIncomingMessage<?> message, Throwable reason, Metadata metadata) {
log.messageFailureIgnored(channel, reason.getMessage());
log.messageFailureFullCause(reason);
message.unwrap().release();
return Uni.createFrom().completionStage(message.ack())
.emitOn(message::runOnMessageContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public Uni<Void> handle(PulsarIncomingMessage<?> message, Throwable reason, Meta
consumer.negativeAcknowledge(message.getMessageId());
log.messageFailureNacked(channel, reason.getMessage());
log.messageFailureFullCause(reason);
message.unwrap().release();
return Uni.createFrom().voidItem()
.emitOn(message::runOnMessageContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public Uni<Void> handle(PulsarIncomingMessage<?> message, Throwable reason, Meta

log.messageFailureDelayed(channel, delay.toSeconds(), reason.getMessage());
log.messageFailureFullCause(reason);
message.unwrap().release();
return Uni.createFrom()
.completionStage(
() -> consumer.reconsumeLaterAsync(message.unwrap(), customProperties, delay.toSeconds(), SECONDS))
Expand Down

0 comments on commit 02538ac

Please # to comment.