From c8a547087f5a5016722ac109d499b96afd7db16e Mon Sep 17 00:00:00 2001 From: dilini-muthumala Date: Thu, 4 Nov 2021 18:41:28 +0530 Subject: [PATCH 1/2] Send events to error store when broker is unavailable Refer https://github.com/siddhi-io/siddhi-io-kafka/issues/143 --- .../java/io/siddhi/extension/io/kafka/sink/KafkaSink.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/component/src/main/java/io/siddhi/extension/io/kafka/sink/KafkaSink.java b/component/src/main/java/io/siddhi/extension/io/kafka/sink/KafkaSink.java index 0103743f..3796b808 100644 --- a/component/src/main/java/io/siddhi/extension/io/kafka/sink/KafkaSink.java +++ b/component/src/main/java/io/siddhi/extension/io/kafka/sink/KafkaSink.java @@ -114,7 +114,7 @@ type = {DataType.STRING}, defaultValue = "null"), @Parameter(name = "is.synchronous", - description = "The Kafka sync will publish the events to the server synchronously when the" + + description = "The Kafka sink will publish the events to the server synchronously when the" + "value is set to `true`, and asynchronously if otherwise", optional = true, type = {DataType.BOOL}, @@ -298,6 +298,10 @@ public void publish(Object payload, DynamicOptions dynamicOptions, KafkaSinkStat metrics.getErrorCountPerStream( streamId, topic, Integer.parseInt(partitionNo), e.getClass().getSimpleName()).inc(); } + if (e.getMessage().contains("apache.kafka.common.errors.TimeoutException")) { + throw new ConnectionUnavailableException("Error occurred when trying to send message. " + + "Broker may not be unavailable.", e); + } } } else { if (null == partitionNo) { From 14f63bf918639d685d1dc6654730d0ee12a05ecb Mon Sep 17 00:00:00 2001 From: dilini-muthumala Date: Fri, 5 Nov 2021 08:13:30 +0530 Subject: [PATCH 2/2] Send events to error handler when broker is unavailable Refer https://github.com/siddhi-io/siddhi-io-kafka/issues/143 --- .../java/io/siddhi/extension/io/kafka/sink/KafkaSink.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/io/kafka/sink/KafkaSink.java b/component/src/main/java/io/siddhi/extension/io/kafka/sink/KafkaSink.java index 3796b808..219247fa 100644 --- a/component/src/main/java/io/siddhi/extension/io/kafka/sink/KafkaSink.java +++ b/component/src/main/java/io/siddhi/extension/io/kafka/sink/KafkaSink.java @@ -299,8 +299,8 @@ public void publish(Object payload, DynamicOptions dynamicOptions, KafkaSinkStat streamId, topic, Integer.parseInt(partitionNo), e.getClass().getSimpleName()).inc(); } if (e.getMessage().contains("apache.kafka.common.errors.TimeoutException")) { - throw new ConnectionUnavailableException("Error occurred when trying to send message. " + - "Broker may not be unavailable.", e); + throw new ConnectionUnavailableException("TimeoutException occurred when trying to send " + + "message. Broker may not be unavailable.", e); } } } else {