-
Notifications
You must be signed in to change notification settings - Fork 1
Kafka
Nightfall comes with support for reading messages from Kafka using Spark Structure Streaming, only kafka 10+ is supported.
You can enable kafka support by adding the following dependency:
<dependency>
<groupId>com.elo7.nightfall</groupId>
<artifactId>nightfall-kafka-0-10</artifactId>
<version>${nightfall.version}</version>
</dependency>
Now to read messagens you just need to annotate your TaskProcessor
with @Kafka
, example:
@Task
@Singleton
class KafkaExampleTask implements TaskProcessor {
private static final long serialVersionUID = 1L;
private final Dataset<Row> events;
@Inject
KafkaExampleTask(@Kafka Dataset<Row> events) {
this.events = events;
}
@Override
public void process() {
events
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING())
.writeStream()
.format("console")
.start();
}
See full example KafkaExampleTask.java.
All configurations from Spark Kafka 10 Integration are supported, you just need to add the spark.kafka.
as prefix, exemple:
spark.kafka.subscribe=test
spark.kafka.kafka.bootstrap.servers=localhost:9092
spark.kafka.persistent.offsets=false
Nightfall supports the persistency of the kafka offsets within Cassandra, but this functionality is limited to assign
option of Kafka.
When you enable the persistency of offset ranges Nightfall will write to two tables:
-
offset_ranges
: persist the latest processed offset range, uses the application name, topic name and partition as primary key. Thus, when application starts the offset range information is retrieved from this table. -
offset_ranges_history
: persist historical of the offset ranges processed, used to find data duplication or when an offset range was processed. Uses application name, topic name, partition and the timestamp which was processed as primary key.
The support for Cassandra was added to control offset in cases where you do not have a reliable storage to persist the Spark checkpoints. The offset persistency has some some limitations:
- If you need to added a new topic partition you need to set the starting offset for all topics and partitions.
- If you need to add a new topic to
assign
configuration you need to set the starting offset for all topics and partitions.
You need to enable the offset persistency and add the following dependency to your project, because it's optional:
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.0.2</version>
</dependency>
You alsse need to create the keyspace in cassandra and execute the migrations, thus all required tables will be created.
For more information see Migrations.
-
spark.kafka.persistent.offsets: enables the offset persistency, default
false
. -
spark.kafka.persistent.startingOffsets.fromRepository: enables the configuration of starting offsets from the persisted ones, default
true
, only used whenspark.kafka.persistent.offsets
is enabled. -
spark.kafka.cassandra.hosts: list of cassandra hosts splited by
,
, required. - spark.kafka.cassandra.keyspace: keyspace of cassandra where the tables resides, required.
-
spark.kafka.cassandra.port: cassandra connection port, default
9042
. -
spark.kafka.offsetRange.history.ttl.days: time to expired the data from
offset_ranges_history
table, default7
days. - spark.kafka.cassandra.user: cassandra connection user name, optional.
- spark.kafka.cassandra.datacenter: cassandra data center name, optional.
Kafka integration is automacally configured with the @Kafka
annotation, which use all configurations that has the spark.kafka.
prefix, but you can customize the Kafka source by creating you own Kafka source from the Spark API code or usging the KafkaDatasetBuilder.java wich you can use the Nightfall persistecy for Kafka.
Take a look at KafkaProvider.java to see how the Dataset from kafka is created.
Elo7