Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feature: supported to use host name in upstream #522

Merged
merged 11 commits into from
Sep 18, 2019
2 changes: 1 addition & 1 deletion bin/apisix
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ http {

lua_socket_log_errors off;

resolver ipv6=off local=on;
resolver {% for _, dns_addr in ipairs(dns_resolver or {}) do %} {*dns_addr*} {% end %} ipv6=off local=on;
resolver_timeout 5;

lua_http10_buffering off;
Expand Down
2 changes: 2 additions & 0 deletions conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ apisix:
# udp: # UDP proxy port list
# - 9200
# - 9211
dns_resolver: # use 114.114.114.114 as default DNS resolver, disable IPv6 and enable local DNS.
- 114.114.114.114
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

114.114.114.114 is only friendly for Chinese. I think 8.8.8.8 or 1.1.1.1 is better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the international lover feel that it is wrong, we modify it better.


etcd:
host: "http://127.0.0.1:2379" # etcd address
Expand Down
2 changes: 1 addition & 1 deletion conf/nginx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ http {

lua_socket_log_errors off;

resolver ipv6=off local=on;
resolver 114.114.114.114 ipv6=off local=on;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

resolver_timeout 5;

lua_http10_buffering off;
Expand Down
8 changes: 4 additions & 4 deletions doc/architecture-design-cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ APISIX 的 Upstream 除了基本的复杂均衡算法选择外,还支持对上
|名字 |可选|说明|
|------- |-----|------|
|type |必需|`roundrobin` 支持权重的负载,`chash` 一致性哈希,两者是二选一的|
|nodes |必需|哈希表,内部元素的 key 是上游机器地址列表(IP+Port 方式),value 则是节点的 weight。无论对于 `roundrobin` 还是 `chash` 算法,上游节点的选择都依赖它。特别的,权重值为 `0` 有特殊含义,通常代表该上游节点失效,永远不希望被选中。|
|nodes |必需|哈希表,内部元素的 key 是上游机器地址列表,格式为`地址 + Port`,其中地址部分可以是 IP 也可以是域名,比如 `192.168.1.100:80`、`foo.com:80`等。value 则是节点的权重,特别的,当权重值为 `0` 有特殊含义,通常代表该上游节点失效,永远不希望被选中。|
|key |必需|该选项只有类型是 `chash` 才有效。根据 `key` 来查找对应的 node `id`,相同的 `key` 在同一个对象中,永远返回相同 id|
|checks |可选|配置健康检查的参数,详细可参考[health-check](health-check.md)|
|retries |可选|使用底层的 Nginx 重试机制将请求传递给下一个上游,默认不启用重试机制|
Expand All @@ -248,8 +248,8 @@ curl http://127.0.0.1:9080/apisix/admin/upstreams/1 -X PUT -d '
"type": "roundrobin",
"nodes": {
"127.0.0.1:80": 1,
"127.0.0.2:80": 1,
"127.0.0.3:80": 1
"127.0.0.2:80": 2,
"foo.com:80": 3
}
}'

Expand All @@ -259,7 +259,7 @@ curl http://127.0.0.1:9080/apisix/admin/upstreams/2 -X PUT -d '
"key": "remote_addr",
"nodes": {
"127.0.0.1:80": 1,
"127.0.0.2:80": 1
"foo.com:80": 2
}
}'
```
Expand Down
123 changes: 105 additions & 18 deletions lua/apisix.lua
Original file line number Diff line number Diff line change
@@ -1,24 +1,32 @@
-- Copyright (C) Yuansheng Wang

local require = require
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local require = require
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local service_fetch = require("apisix.http.service").get
local admin_init = require("apisix.admin.init")
local get_var = require("resty.ngxvar").fetch
local router = require("apisix.router")
local ngx = ngx
local get_method = ngx.req.get_method
local ngx_exit = ngx.exit
local ngx_ERROR = ngx.ERROR
local math = math
local error = error
local ngx_var = ngx.var
local ipairs = ipairs
local admin_init = require("apisix.admin.init")
local get_var = require("resty.ngxvar").fetch
local router = require("apisix.router")
local ipmatcher = require("resty.ipmatcher")
local ngx = ngx
local get_method = ngx.req.get_method
local ngx_exit = ngx.exit
local ngx_ERROR = ngx.ERROR
local math = math
local error = error
local ngx_var = ngx.var
local ipairs = ipairs
local pairs = pairs
local tostring = tostring
local load_balancer


local _M = {version = 0.2}
local parsed_domain = core.lrucache.new({
ttl = 300, count = 512
})


local _M = {version = 0.3}


function _M.http_init()
Expand Down Expand Up @@ -143,6 +151,70 @@ function _M.http_ssl_phase()
end


local function parse_domain_in_up(up, ver)
local local_conf = core.config.local_conf()
local dns_resolver = local_conf and local_conf.apisix and
local_conf.apisix.dns_resolver
local new_nodes = core.table.new(0, 8)

for addr, weight in pairs(up.value.nodes) do
local host, port = core.utils.parse_addr(addr)
if not ipmatcher.parse_ipv4(host) and
not ipmatcher.parse_ipv6(host) then
local ip_info = core.utils.dns_parse(dns_resolver, host)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not drop the error msg of dns_parse. if the domain is invalid or timeout, what's the result? we should add test cases for that.

core.log.info("parse addr: ", core.json.delay_encode(ip_info),
" resolver: ", core.json.delay_encode(dns_resolver),
" addr: ", addr)
if ip_info and ip_info.address then
new_nodes[ip_info.address .. ":" .. port] = weight
core.log.info("dns resolver domain: ", host, " to ",
ip_info.address)
end
else
new_nodes[addr] = weight
end
end

up.dns_value = core.table.clone(up.value)
up.dns_value.nodes = new_nodes
core.log.info("parse upstream which contain domain: ",
core.json.delay_encode(up))
return up
end


local function parse_domain_in_route(route, ver)
local local_conf = core.config.local_conf()
local dns_resolver = local_conf and local_conf.apisix and
local_conf.apisix.dns_resolver
local new_nodes = core.table.new(0, 8)

for addr, weight in pairs(route.value.upstream.nodes) do
local host, port = core.utils.parse_addr(addr)
if not ipmatcher.parse_ipv4(host) and
not ipmatcher.parse_ipv6(host) then
local ip_info = core.utils.dns_parse(dns_resolver, host)
core.log.info("parse addr: ", core.json.delay_encode(ip_info),
" resolver: ", core.json.delay_encode(dns_resolver),
" addr: ", addr)
if ip_info and ip_info.address then
new_nodes[ip_info.address .. ":" .. port] = weight
core.log.info("dns resolver domain: ", host, " to ",
ip_info.address)
end
else
new_nodes[addr] = weight
end
end

route.dns_value = core.table.deepcopy(route.value)
route.dns_value.upstream.nodes = new_nodes
core.log.info("parse route which contain domain: ",
core.json.delay_encode(route))
return route
end


do
local upstream_vars = {
uri = "upstream_uri",
Expand Down Expand Up @@ -226,7 +298,7 @@ function _M.http_access_phase()
end

local changed
route, changed = plugin.merge_service_route(service, route)
route, changed = plugin.merge_route(service, route)
api_ctx.matched_route = route

if changed then
Expand All @@ -240,13 +312,28 @@ function _M.http_access_phase()
api_ctx.conf_version = service.modifiedIndex
api_ctx.conf_id = service.value.id
end

else
api_ctx.conf_type = "route"
api_ctx.conf_version = route.modifiedIndex
api_ctx.conf_id = route.value.id
end

local up_id = route.value.upstream_id
if up_id then
local upstreams_etcd = core.config.fetch_created_obj("/upstreams")
if upstreams_etcd then
local upstream = upstreams_etcd:get(tostring(up_id))
if upstream.has_domain then
parsed_domain(upstream, api_ctx.conf_version,
parse_domain_in_up, upstream)
end
end

elseif route.has_domain then
route = parsed_domain(route, api_ctx.conf_version,
parse_domain_in_route, route)
end

local plugins = core.tablepool.fetch("plugins", 32, 0)
api_ctx.plugins = plugin.filter(route, plugins)

Expand Down Expand Up @@ -288,7 +375,7 @@ function _M.grpc_access_phase()
end

local changed
route, changed = plugin.merge_service_route(service, route)
route, changed = plugin.merge_route(service, route)
api_ctx.matched_route = route

if changed then
Expand Down
87 changes: 53 additions & 34 deletions lua/apisix/balancer.lua
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
local healthcheck = require("resty.healthcheck")
local roundrobin = require("resty.roundrobin")
local resty_chash = require("resty.chash")
local balancer = require("ngx.balancer")
local core = require("apisix.core")
local sub_str = string.sub
local find_str = string.find
local upstreams_etcd
local error = error
local str_char = string.char
local str_gsub = string.gsub
local pairs = pairs
local tonumber = tonumber
local tostring = tostring
local set_more_tries = balancer.set_more_tries
local balancer = require("ngx.balancer")
local core = require("apisix.core")
local sub_str = string.sub
local find_str = string.find
local error = error
local str_char = string.char
local str_gsub = string.gsub
local pairs = pairs
local tonumber = tonumber
local tostring = tostring
local set_more_tries = balancer.set_more_tries
local get_last_failure = balancer.get_last_failure
local set_timeouts = balancer.set_timeouts
local set_timeouts = balancer.set_timeouts
local upstreams_etcd


local module_name = "balancer"
Expand Down Expand Up @@ -170,8 +170,9 @@ local function pick_server(route, ctx)
core.log.info("ctx: ", core.json.delay_encode(ctx, true))
local healthcheck_parent = route
local up_id = route.value.upstream_id
local upstream = route.value.upstream
if not up_id and not upstream then
local up_conf = (route.dns_value and route.dns_value.upstream)
or route.value.upstream
if not up_id and not up_conf then
return nil, nil, "missing upstream configuration"
end

Expand All @@ -184,40 +185,40 @@ local function pick_server(route, ctx)
.. "upstream information"
end

local upstream_obj = upstreams_etcd:get(tostring(up_id))
if not upstream_obj then
local up_obj = upstreams_etcd:get(tostring(up_id))
if not up_obj then
return nil, nil, "failed to find upstream by id: " .. up_id
end
core.log.info("upstream: ", core.json.delay_encode(upstream_obj))
core.log.info("upstream: ", core.json.delay_encode(up_obj))

healthcheck_parent = upstream_obj
upstream = upstream_obj.value
version = upstream_obj.modifiedIndex
key = upstream.type .. "#upstream_" .. up_id
healthcheck_parent = up_obj
up_conf = up_obj.dns_value or up_obj.value
version = up_obj.modifiedIndex
key = up_conf.type .. "#upstream_" .. up_id

else
version = ctx.conf_version
key = upstream.type .. "#route_" .. route.value.id
key = up_conf.type .. "#route_" .. route.value.id
end

local checker = fetch_healthchecker(upstream, healthcheck_parent, version)
local retries = upstream.retries
local checker = fetch_healthchecker(up_conf, healthcheck_parent, version)
local retries = up_conf.retries
if retries and retries > 0 then
ctx.balancer_try_count = (ctx.balancer_try_count or 0) + 1
if checker and ctx.balancer_try_count > 1 then
local state, code = get_last_failure()
if state == "failed" then
if code == 504 then
checker:report_timeout(ctx.balancer_ip, ctx.balancer_port,
upstream.checks.host)
up_conf.checks.host)
else
checker:report_tcp_failure(ctx.balancer_ip,
ctx.balancer_port, upstream.checks.host)
ctx.balancer_port, up_conf.checks.host)
end

else
checker:report_http_status(ctx.balancer_ip, ctx.balancer_port,
upstream.checks.host, code)
up_conf.checks.host, code)
end
end

Expand All @@ -231,7 +232,7 @@ local function pick_server(route, ctx)
end

local server_picker = lrucache_server_picker(key, version,
create_server_picker, upstream, checker)
create_server_picker, up_conf, checker)
if not server_picker then
return nil, nil, "failed to fetch server picker"
end
Expand All @@ -241,8 +242,8 @@ local function pick_server(route, ctx)
return nil, nil, "failed to find valid upstream server" .. err
end

if upstream.timeout then
local timeout = upstream.timeout
if up_conf.timeout then
local timeout = up_conf.timeout
local ok, err = set_timeouts(timeout.connect, timeout.send,
timeout.read)
if not ok then
Expand Down Expand Up @@ -280,9 +281,27 @@ end
function _M.init_worker()
local err
upstreams_etcd, err = core.config.new("/upstreams", {
automatic = true,
item_schema = core.schema.upstream
})
automatic = true,
item_schema = core.schema.upstream,
filter = function(upstream)
upstream.has_domain = false
if not upstream.value then
return
end

for addr, _ in pairs(upstream.value.nodes or {}) do
local host = core.utils.parse_addr(addr)
if not core.utils.parse_ipv4(host) and
not core.utils.parse_ipv6(host) then
upstream.has_domain = true
break
end
end

core.log.info("filter upstream: ",
core.json.delay_encode(upstream))
end,
})
if not upstreams_etcd then
error("failed to create etcd instance for fetching upstream: " .. err)
return
Expand Down
Loading