Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[WIP] Kafka Logger. #1260

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ before_cache:
- brew cleanup

before_install:
- docker pull bitnami/zookeeper:3.6.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should install dependencies in specific scripts, eg: https://github.com/apache/incubator-apisix/blob/master/.travis/linux_openresty_runner.sh#L35

It may be a little between different os, eg: Linux and OSX.

- docker pull bitnami/kafka:latest
- docker network create kafka-net --driver bridge
- docker run --name zookeeper-server -d -p 2181:2181 --network kafka-net -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
- docker run --name kafka-server1 -d --network kafka-net -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
- sleep 5
- docker exec -it kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic test2
- echo $OSNAME
- $PWD/.travis/${OSNAME}_runner.sh before_install

Expand Down
1 change: 1 addition & 0 deletions conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ plugins: # plugin list
- proxy-cache
- tcp-logger
- proxy-mirror
- kafka-logger

stream_plugins:
- mqtt-proxy
107 changes: 107 additions & 0 deletions lua/apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
local pairs = pairs
local type = type
local table = table

local plugin_name = "kafka-logger"
local ngx = ngx

local timer_at = ngx.timer.at

local schema = {
type = "object",
properties = {
broker_list = {
type = "object"
},
timeout = { -- timeout in milliseconds
type = "integer", minimum = 1, default= 2000
},
kafka_topic = {type = "string"},
async = {type = "boolean", default = false},
key = {type = "string"},
max_retry = {type = "integer", minimum = 0 , default = 3},
},
required = {"broker_list", "kafka_topic", "key"}
}

local _M = {
version = 0.1,
priority = 403,
name = plugin_name,
schema = schema,
}

function _M.check_schema(conf)
return core.schema.check(schema, conf)
end

local function log(premature, conf, log_message)
if premature then
return
end

if core.table.nkeys(conf.broker_list) == 0 then
core.log.error("failed to identify the broker specified")
end

local broker_list = {}
local broker_config = {}

for host, port in pairs(conf.broker_list) do
if type(host) == 'string'
and type(port) == 'number' then

local broker = {
host = host, port = port
}
table.insert(broker_list,broker)
end
end

broker_config["request_timeout"] = conf.timeout
broker_config["max_retry"] = conf.max_retry

--Async producers will queue logs and push them when the buffer exceeds.
if conf.async then
broker_config["producer_type"] = "async"
end

local prod, err = producer:new(broker_list)
if err then
core.log.error("failed to identify the broker specified", err)

return
end

local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
if not ok then
core.log.error("failed to send data to Kafka topic", err)
end

end

function _M.log(conf)
return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx)))
end

return _M

2 changes: 2 additions & 0 deletions rockspec/apisix-master-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ dependencies = {
"lua-resty-prometheus = 1.0",
"jsonschema = 0.8",
"lua-resty-ipmatcher = 0.6",
"lua-resty-kafka = 0.07",

}

build = {
Expand Down
2 changes: 1 addition & 1 deletion t/admin/plugins.t
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ __DATA__
--- request
GET /apisix/admin/plugins/list
--- response_body_like eval
qr/\["limit-req","limit-count","limit-conn","key-auth","basic-auth","prometheus","node-status","jwt-auth","zipkin","ip-restriction","grpc-transcode","serverless-pre-function","serverless-post-function","openid-connect","proxy-rewrite","redirect","response-rewrite","fault-injection","udp-logger","wolf-rbac","proxy-cache","tcp-logger","proxy-mirror"\]/
qr/\["limit-req","limit-count","limit-conn","key-auth","basic-auth","prometheus","node-status","jwt-auth","zipkin","ip-restriction","grpc-transcode","serverless-pre-function","serverless-post-function","openid-connect","proxy-rewrite","redirect","response-rewrite","fault-injection","udp-logger","wolf-rbac","proxy-cache","tcp-logger","proxy-mirror","kafka-logger"\]/
--- no_error_log
[error]

Expand Down
1 change: 1 addition & 0 deletions t/debug/debug-mode.t
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ loaded plugin and sort by priority: 899 name: response-rewrite
loaded plugin and sort by priority: 506 name: grpc-transcode
loaded plugin and sort by priority: 500 name: prometheus
loaded plugin and sort by priority: 405 name: tcp-logger
loaded plugin and sort by priority: 403 name: kafka-logger
loaded plugin and sort by priority: 400 name: udp-logger
loaded plugin and sort by priority: 0 name: example-plugin
loaded plugin and sort by priority: -1000 name: zipkin
Expand Down
243 changes: 243 additions & 0 deletions t/plugin/kafka-logger.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
use t::APISIX 'no_plan';

repeat_each(1);
no_long_string();
no_root_location();
run_tests;

__DATA__

=== TEST 1: sanity
--- config
location /t {
content_by_lua_block {
local plugin = require("apisix.plugins.kafka-logger")
local ok, err = plugin.check_schema({
kafka_topic = "test",
key = "key1",
broker_list = {
["127.0.0.1"] = 3
}
})
if not ok then
ngx.say(err)
end

ngx.say("done")
}
}
--- request
GET /t
--- response_body
done
--- no_error_log
[error]


=== TEST 2: missing broker list
--- config
location /t {
content_by_lua_block {
local plugin = require("apisix.plugins.kafka-logger")
local ok, err = plugin.check_schema({kafka_topic = "test", key= "key1"})
if not ok then
ngx.say(err)
end
ngx.say("done")
}
}
--- request
GET /t
--- response_body
property "broker_list" is required
done
--- no_error_log
[error]

=== TEST 3: wrong type of string
--- config
location /t {
content_by_lua_block {
local plugin = require("apisix.plugins.kafka-logger")
local ok, err = plugin.check_schema({
broker_list = {
["127.0.0.1"] = 3000
},
timeout = "10",
kafka_topic ="test",
key= "key1"
})
if not ok then
ngx.say(err)
end
ngx.say("done")
}
}
--- request
GET /t
--- response_body
property "timeout" validation failed: wrong type: expected integer, got string
done
--- no_error_log
[error]

=== TEST 4: add plugin
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"key" : "key1"
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}]],
[[{
"node": {
"value": {
"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"key" : "key1"
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
},
"key": "/apisix/routes/1"
},
"action": "set"
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]

=== TEST 5: access
--- request
GET /hello
--- response_body
hello world
--- no_error_log
[error]
--- wait: 0.2

=== TEST 6: error log
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092,
"127.0.0.1":9093
},
"kafka_topic" : "test2",
"key" : "key1"
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}]],
[[{
"node": {
"value": {
"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092,
"127.0.0.1":9093
},
"kafka_topic" : "test2",
"key" : "key1"
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
},
"key": "/apisix/routes/1"
},
"action": "set"
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
local http = require "resty.http"
local httpc = http.new()
local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
local res, err = httpc:request_uri(uri, {method = "GET"})
}
}
--- request
GET /t
--- error_log
failed to send data to Kafka topic
[error]
--- wait: 0.2