diff --git a/component/src/main/java/io/siddhi/extension/io/kafka/sink/KafkaSink.java b/component/src/main/java/io/siddhi/extension/io/kafka/sink/KafkaSink.java index 0103743f..219247fa 100644 --- a/component/src/main/java/io/siddhi/extension/io/kafka/sink/KafkaSink.java +++ b/component/src/main/java/io/siddhi/extension/io/kafka/sink/KafkaSink.java @@ -114,7 +114,7 @@ type = {DataType.STRING}, defaultValue = "null"), @Parameter(name = "is.synchronous", - description = "The Kafka sync will publish the events to the server synchronously when the" + + description = "The Kafka sink will publish the events to the server synchronously when the" + "value is set to `true`, and asynchronously if otherwise", optional = true, type = {DataType.BOOL}, @@ -298,6 +298,10 @@ public void publish(Object payload, DynamicOptions dynamicOptions, KafkaSinkStat metrics.getErrorCountPerStream( streamId, topic, Integer.parseInt(partitionNo), e.getClass().getSimpleName()).inc(); } + if (e.getMessage().contains("apache.kafka.common.errors.TimeoutException")) { + throw new ConnectionUnavailableException("TimeoutException occurred when trying to send " + + "message. Broker may not be unavailable.", e); + } } } else { if (null == partitionNo) {