Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Send events to error handlers when broker is unavailable #144

Merged
merged 2 commits into from
Nov 5, 2021

Conversation

dilini-muthumala
Copy link
Contributor

@dilini-muthumala dilini-muthumala commented Nov 4, 2021

Purpose

Resolves #143

This is a new improvement.
Currently when the broker is not available, kafka producer will retry sending the event, but if the broker does not become available during that retry period, the event will be dropped.
After this improvement, such events can be redirected to Siddhi Error Handlers (STREAM/STORE/WAIT).

Goals

Add Error handling support for Kafka Sink

Approach

Throw ConnectionUnavailableException when TimeoutException occurrs.

Sample Configuration

Please refer sample siddhi app below. Here, when the broker is not available, we have specified to redirect the events to Error Store (on.error='STORE' )

@App:name("HTTPToKafka")
@App:description('Consume events from HTTP and publish to Kafka.')

@Source(type = 'http', receiver.url='http://localhost:8006/SweetProductionStream', basic.auth.enabled='false',
        @map(type='json'))
define stream SweetProductionStream (name string, amount double);

@sink(type='kafka',
          on.error='STORE',
          topic='bulk-orders',
          bootstrap.servers='localhost:9092',
--       partition.no='0',
          is.synchronous='true',
          retry.send='true',
          @map(type='json'))
    define stream BulkOrdersStream (name string, amount double);

from SweetProductionStream
select name, amount
insert into BulkOrdersStream;

Release note

Add Siddhi error handling (Stream/Store/Wait) support for Kafka Sink

Documentation

General Siddhi doc for error handling is applicable for the Kafka Sink as well.

Automation tests

Security checks

Related PRs

Below PR needs to be merged for this feature to work properly - siddhi-io/siddhi#1755

Migrations (if applicable)

None.

Test environment

SI 4.0.0
kafka_2.11-0.10.2.1
java version "1.8.0_301"
Java(TM) SE Runtime Environment (build 1.8.0_301-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.301-b09, mixed mode)

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[KafkaSink] Send events to error store when the broker is not available
2 participants