Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat(kafka-logger): add required_acks option #4878

Merged
merged 11 commits into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ local schema = {
default = "async",
enum = {"async", "sync"},
},
required_acks = {
type = "integer",
default = 1,
enum = { 0, 1, -1 },
},
key = {type = "string"},
timeout = {type = "integer", minimum = 1, default = 3},
name = {type = "string", default = "kafka logger"},
Expand Down Expand Up @@ -207,6 +212,7 @@ function _M.log(conf, ctx)

broker_config["request_timeout"] = conf.timeout * 1000
broker_config["producer_type"] = conf.producer_type
broker_config["required_acks"] = conf.required_acks

local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer,
broker_list, broker_config)
Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ For more info on Batch-Processor in Apache APISIX please refer.
| broker_list | object | required | | | An array of Kafka brokers. |
| kafka_topic | string | required | | | Target topic to push data. |
| producer_type | string | optional | async | ["async", "sync"] | Producer's mode of sending messages. |
| required_acks | integer | optional | 1 | [0, 1, -1] | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. Semantics is the same as kafka producer acks. |
| key | string | optional | | | Used for partition allocation of messages. |
| timeout | integer | optional | 3 | [1,...] | Timeout for the upstream to send data. |
| name | string | optional | "kafka logger" | | A unique identifier to identity the batch processor. |
Expand Down
1 change: 1 addition & 0 deletions docs/zh/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ title: kafka-logger
| broker_list | object | 必须 | | | 要推送的 kafka 的 broker 列表。 |
| kafka_topic | string | 必须 | | | 要推送的 topic。 |
| producer_type | string | 可选 | async | ["async", "sync"] | 生产者发送消息的模式。 |
| required_acks | integer | 可选 | 1 | [0, 1, -1] | 生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。这个参数是为了保证发送请求的可靠性。语义同 kafka 生产者的 acks 参数。 |
| key | string | 可选 | | | 用于消息的分区分配。 |
| timeout | integer | 可选 | 3 | [1,...] | 发送数据的超时时间。 |
| name | string | 可选 | "kafka logger" | | batch processor 的唯一标识。 |
Expand Down
29 changes: 29 additions & 0 deletions t/plugin/kafka-logger.t
Original file line number Diff line number Diff line change
Expand Up @@ -722,3 +722,32 @@ GET /t
[qr/partition_id: 1/,
qr/partition_id: 0/,
qr/partition_id: 2/]



=== TEST 20: required_acks, matches none of the enum values
--- config
location /t {
content_by_lua_block {
local plugin = require("apisix.plugins.kafka-logger")
local ok, err = plugin.check_schema({
broker_list = {
["127.0.0.1"] = 3000
},
required_acks = 10,
kafka_topic ="test",
key= "key1"
})
if not ok then
ngx.say(err)
end
ngx.say("done")
}
}
--- request
GET /t
--- response_body
property "required_acks" validation failed: matches none of the enum values
done
--- no_error_log
[error]