From ea58c3337e099622a2ba9521b1c9103d5a1676ac Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Mon, 9 Sep 2024 10:54:18 -0700 Subject: [PATCH] [fix][cli] Fix Pulsar-Client to allow consume encrypted messages with appropriate crypto-failure-action --- .../main/java/org/apache/pulsar/client/cli/CmdConsume.java | 5 +++++ .../src/main/java/org/apache/pulsar/client/cli/CmdRead.java | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java index 71c172b6337130..46adc45e2ea4d8 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java @@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; @@ -111,6 +112,9 @@ public class CmdConsume extends AbstractCmdConsume { @Option(names = {"-rs", "--replicated" }, description = "Whether the subscription status should be replicated") private boolean replicateSubscriptionState = false; + @Option(names = { "-ca", "--crypto-failure-action" }, description = "Crypto Failure Action") + private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL; + public CmdConsume() { // Do nothing super(); @@ -174,6 +178,7 @@ private int consume(String topic) { } builder.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull); + builder.cryptoFailureAction(cryptoFailureAction); if (isNotBlank(this.encKeyValue)) { builder.defaultCryptoKeyReader(this.encKeyValue); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java index daab436499219d..51bf2d6898b6b9 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java @@ -30,6 +30,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; @@ -101,6 +102,9 @@ public class CmdRead extends AbstractCmdConsume { @Option(names = { "-pm", "--pool-messages" }, description = "Use the pooled message", arity = "1") private boolean poolMessages = true; + @Option(names = { "-ca", "--crypto-failure-action" }, description = "Crypto Failure Action") + private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL; + public CmdRead() { // Do nothing super(); @@ -153,6 +157,7 @@ private int read(String topic) { } builder.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull); + builder.cryptoFailureAction(cryptoFailureAction); if (isNotBlank(this.encKeyValue)) { builder.defaultCryptoKeyReader(this.encKeyValue);