diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index a7788a0c1d19..f03c55b8bbb1 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -52,6 +52,11 @@ local schema = { default = "async", enum = {"async", "sync"}, }, + required_acks = { + type = "integer", + default = 1, + enum = { 0, 1, -1 }, + }, key = {type = "string"}, timeout = {type = "integer", minimum = 1, default = 3}, name = {type = "string", default = "kafka logger"}, @@ -207,6 +212,7 @@ function _M.log(conf, ctx) broker_config["request_timeout"] = conf.timeout * 1000 broker_config["producer_type"] = conf.producer_type + broker_config["required_acks"] = conf.required_acks local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer, broker_list, broker_config) diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index b6d1583fd868..4da9a236503e 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -48,6 +48,7 @@ For more info on Batch-Processor in Apache APISIX please refer. | broker_list | object | required | | | An array of Kafka brokers. | | kafka_topic | string | required | | | Target topic to push data. | | producer_type | string | optional | async | ["async", "sync"] | Producer's mode of sending messages. | +| required_acks | integer | optional | 1 | [0, 1, -1] | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. Semantics is the same as kafka producer acks(If set `acks=0` then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. `acks=1` This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. `acks=-1` This means the leader will wait for the full set of in-sync replicas to acknowledge the record.). | | key | string | optional | | | Used for partition allocation of messages. | | timeout | integer | optional | 3 | [1,...] | Timeout for the upstream to send data. | | name | string | optional | "kafka logger" | | A unique identifier to identity the batch processor. | diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md index 21cec7b5996c..05d6f46ac2da 100644 --- a/docs/zh/latest/plugins/kafka-logger.md +++ b/docs/zh/latest/plugins/kafka-logger.md @@ -46,6 +46,7 @@ title: kafka-logger | broker_list | object | 必须 | | | 要推送的 kafka 的 broker 列表。 | | kafka_topic | string | 必须 | | | 要推送的 topic。 | | producer_type | string | 可选 | async | ["async", "sync"] | 生产者发送消息的模式。 | +| required_acks | integer | 可选 | 1 | [0, 1, -1] | 生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。这个参数是为了保证发送请求的可靠性。语义同 kafka 生产者的 acks 参数(如果设置 `acks=0`,则 producer 不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成。如果设置 `acks=1`,leader 节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。如果设置 `acks=-1`,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。)。 | | key | string | 可选 | | | 用于消息的分区分配。 | | timeout | integer | 可选 | 3 | [1,...] | 发送数据的超时时间。 | | name | string | 可选 | "kafka logger" | | batch processor 的唯一标识。 | diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 46a520c84bc5..bf78ef22e349 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -722,3 +722,138 @@ GET /t [qr/partition_id: 1/, qr/partition_id: 0/, qr/partition_id: 2/] + + + +=== TEST 20: required_acks, matches none of the enum values +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + broker_list = { + ["127.0.0.1"] = 3000 + }, + required_acks = 10, + kafka_topic ="test", + key= "key1" + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +property "required_acks" validation failed: matches none of the enum values +done +--- no_error_log +[error] + + + +=== TEST 21: report log to kafka, with required_acks(1, 0, -1) +--- config +location /t { + content_by_lua_block { + local data = { + { + input = { + plugins = { + ["kafka-logger"] = { + broker_list = { + ["127.0.0.1"] = 9092 + }, + kafka_topic = "test2", + producer_type = "sync", + timeout = 1, + batch_max_size = 1, + required_acks = 1, + meta_format = "origin", + } + }, + upstream = { + nodes = { + ["127.0.0.1:1980"] = 1 + }, + type = "roundrobin" + }, + uri = "/hello", + }, + }, + { + input = { + plugins = { + ["kafka-logger"] = { + broker_list = { + ["127.0.0.1"] = 9092 + }, + kafka_topic = "test2", + producer_type = "sync", + timeout = 1, + batch_max_size = 1, + required_acks = -1, + meta_format = "origin", + } + }, + upstream = { + nodes = { + ["127.0.0.1:1980"] = 1 + }, + type = "roundrobin" + }, + uri = "/hello", + }, + }, + { + input = { + plugins = { + ["kafka-logger"] = { + broker_list = { + ["127.0.0.1"] = 9092 + }, + kafka_topic = "test2", + producer_type = "sync", + timeout = 1, + batch_max_size = 1, + required_acks = 0, + meta_format = "origin", + } + }, + upstream = { + nodes = { + ["127.0.0.1:1980"] = 1 + }, + type = "roundrobin" + }, + uri = "/hello", + }, + }, + } + + local t = require("lib.test_admin").test + local err_count = 0 + for i in ipairs(data) do + local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, data[i].input) + + if code >= 300 then + err_count = err_count + 1 + end + ngx.print(body) + + t('/hello', ngx.HTTP_GET) + end + + assert(err_count == 0) + } +} +--- request +GET /t +--- no_error_log +[error] +--- error_log +send data to kafka: GET /hello +send data to kafka: GET /hello +send data to kafka: GET /hello