This bundle aims to provide a simple Kafka transport for Symfony Messenger.
Open a command console, enter your project directory and execute:
$ composer require koco/messenger-kafka
Open a command console, enter your project directory and execute the following command to download the latest stable version of this bundle:
$ composer require koco/messenger-kafka
This command requires you to have Composer installed globally, as explained in the installation chapter of the Composer documentation.
Then, enable the bundle by adding it to the list of registered bundles
in the config/bundles.php
file of your project:
// config/bundles.php
return [
// ...
Koco\Kafka\KocoKafkaBundle::class => ['all' => true],
];
Specify a DSN starting with either kafka://
or kafka+ssl://
. There can be multiple brokers separated by ,
kafka://my-local-kafka:9092
kafka+ssl://my-staging-kafka:9093
kafka+ssl://prod-kafka-01:9093,kafka+ssl://prod-kafka-01:9093,kafka+ssl://prod-kafka-01:9093
The configuration options for kafka_conf
and topic_conf
can be found here.
It is highly recommended to set enable.auto.offset.store
to false
for consumers. Otherwise every message is acknowledged, regardless of any error thrown by the message handlers.
framework:
messenger:
transports:
producer:
dsn: '%env(KAFKA_URL)%'
options:
topic:
name: 'events'
kafka_conf:
security.protocol: 'sasl_ssl'
ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
sasl.username: '%env(KAFKA_SASL_USERNAME)%'
sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
sasl.mechanisms: 'SCRAM-SHA-256'
consumer:
dsn: '%env(KAFKA_URL)%'
options:
commitAsync: true
receiveTimeout: 10000
topic:
name: "events"
kafka_conf:
enable.auto.offset.store: 'false'
group.id: 'my-group-id' # should be unique per consumer
security.protocol: 'sasl_ssl'
ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
sasl.username: '%env(KAFKA_SASL_USERNAME)%'
sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
sasl.mechanisms: 'SCRAM-SHA-256'
max.poll.interval.ms: '45000'
topic_conf:
auto.offset.reset: 'smallest'