Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 73 additions & 13 deletions apisix/discovery/eureka/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ local ngx_timer_every = ngx.timer.every
local string_sub = string.sub
local str_find = core.string.find
local log = core.log
local semaphore = require("ngx.semaphore")

local default_weight
local applications

local init_sema
local initial_fetched = false

local _M = {
version = 0.1,
Expand Down Expand Up @@ -140,31 +142,72 @@ local function parse_instance(instance)
end


local function build_endpoints()
local host_list = local_conf.discovery and local_conf.discovery.eureka and local_conf.discovery.eureka.host
if not host_list or #host_list == 0 then
log.error("do not set eureka.host")
return nil
end

local endpoints = core.table.new(#host_list, 0)
for _, h in ipairs(host_list) do
local url = h
local basic_auth
local auth_idx = str_find(url, "@")
if auth_idx then
local protocol_idx = str_find(url, "://")
local protocol = string_sub(url, 1, protocol_idx + 2)
local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 1)
local other = string_sub(url, auth_idx + 1)
url = protocol .. other
basic_auth = "Basic " .. ngx.encode_base64(user_and_password)
end
if local_conf.discovery.eureka.prefix then
url = url .. local_conf.discovery.eureka.prefix
end
if string_sub(url, #url) ~= "/" then
url = url .. "/"
end
core.table.insert(endpoints, { url = url, auth = basic_auth })
end
return endpoints
end


local function fetch_full_registry(premature)
if premature then
return
end

local request_uri, basic_auth = service_info()
if not request_uri then
local endpoints = build_endpoints()
if not endpoints or #endpoints == 0 then
return
end

local res, err = request(request_uri, basic_auth, "GET", "apps")
if not res then
log.error("failed to fetch registry", err)
return
local res, err
local used_endpoint
local start = math_random(#endpoints)
for i = 0, #endpoints - 1 do
local ep = endpoints[((start + i) % #endpoints) + 1]
log.info("eureka uri:", ep.url, ".")
local r, e = request(ep.url, ep.auth, "GET", "apps")
if r and r.body and r.status == 200 then
res = r
used_endpoint = ep
break
end
log.warn("failed to fetch registry from ", ep.url, ": ", e or (r and ("status=" .. tostring(r.status)) or "unknown"))
end

if not res.body or res.status ~= 200 then
log.error("failed to fetch registry, status = ", res.status)
if not res then
log.error("failed to fetch registry from all eureka hosts")
return
end

local json_str = res.body
local data, err = core.json.decode(json_str)
local data, derr = core.json.decode(json_str)
if not data then
log.error("invalid response body: ", json_str, " err: ", err)
log.error("invalid response body: ", json_str, " err: ", derr)
return
end
local apps = data.applications.application
Expand All @@ -185,17 +228,33 @@ local function fetch_full_registry(premature)
metadata = metadata,
})
if metadata then
-- remove useless data
metadata.weight = nil
end
end
end
end
applications = up_apps
log.info("successfully updated service registry, services count=",
core.table.nkeys(up_apps), "; source=", used_endpoint and used_endpoint.url or "unknown")
if not initial_fetched then
initial_fetched = true
if init_sema then
init_sema:post(1)
end
end
end


function _M.nodes(service_name)
if not applications then
if init_sema then
local ok, err = init_sema:wait(3)
if not ok then
log.warn("wait eureka initial fetch timeout: ", err)
end
end
end

if not applications then
log.error("failed to fetch nodes for : ", service_name)
return
Expand All @@ -204,12 +263,13 @@ function _M.nodes(service_name)
return applications[service_name]
end


function _M.init_worker()
default_weight = local_conf.discovery.eureka.weight or 100
log.info("default_weight:", default_weight, ".")
local fetch_interval = local_conf.discovery.eureka.fetch_interval or 30
log.info("fetch_interval:", fetch_interval, ".")
init_sema = semaphore.new()

ngx_timer_at(0, fetch_full_registry)
ngx_timer_every(fetch_interval, fetch_full_registry)
end
Expand Down