From d27b701c1f5e55fdb28b2944895aab7fcf25db1a Mon Sep 17 00:00:00 2001 From: wisonzhu Date: Thu, 23 Oct 2025 20:11:38 +0800 Subject: [PATCH 1/2] Update init.lua MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 解决eureka任意节点故障后执行reload会丢失一半请求 https://github.com/apache/apisix/issues/12610 --- apisix/discovery/eureka/init.lua | 126 ++++++++++++++++++++++++++++--- 1 file changed, 114 insertions(+), 12 deletions(-) diff --git a/apisix/discovery/eureka/init.lua b/apisix/discovery/eureka/init.lua index df72a5269e59..aa010ec1632c 100644 --- a/apisix/discovery/eureka/init.lua +++ b/apisix/discovery/eureka/init.lua @@ -29,10 +29,12 @@ local ngx_timer_every = ngx.timer.every local string_sub = string.sub local str_find = core.string.find local log = core.log +local semaphore = require("ngx.semaphore") local default_weight local applications - +local init_sema +local initial_fetched = false local _M = { version = 0.1, @@ -140,31 +142,73 @@ local function parse_instance(instance) end +local function build_endpoints() + local host_list = local_conf.discovery and local_conf.discovery.eureka and local_conf.discovery.eureka.host + if not host_list or #host_list == 0 then + log.error("do not set eureka.host") + return nil + end + + local endpoints = core.table.new(#host_list, 0) + for _, h in ipairs(host_list) do + local url = h + local basic_auth + local auth_idx = str_find(url, "@") + if auth_idx then + local protocol_idx = str_find(url, "://") + local protocol = string_sub(url, 1, protocol_idx + 2) + local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 1) + local other = string_sub(url, auth_idx + 1) + url = protocol .. other + basic_auth = "Basic " .. ngx.encode_base64(user_and_password) + end + if local_conf.discovery.eureka.prefix then + url = url .. local_conf.discovery.eureka.prefix + end + if string_sub(url, #url) ~= "/" then + url = url .. "/" + end + core.table.insert(endpoints, { url = url, auth = basic_auth }) + end + return endpoints +end + + local function fetch_full_registry(premature) if premature then return end - local request_uri, basic_auth = service_info() - if not request_uri then + -- 遍历所有 eureka 端点,直到成功 + local endpoints = build_endpoints() + if not endpoints or #endpoints == 0 then return end - local res, err = request(request_uri, basic_auth, "GET", "apps") - if not res then - log.error("failed to fetch registry", err) - return + local res, err + local used_endpoint + local start = math_random(#endpoints) + for i = 0, #endpoints - 1 do + local ep = endpoints[((start + i) % #endpoints) + 1] + log.info("eureka uri:", ep.url, ".") + local r, e = request(ep.url, ep.auth, "GET", "apps") + if r and r.body and r.status == 200 then + res = r + used_endpoint = ep + break + end + log.warn("failed to fetch registry from ", ep.url, ": ", e or (r and ("status=" .. tostring(r.status)) or "unknown")) end - if not res.body or res.status ~= 200 then - log.error("failed to fetch registry, status = ", res.status) + if not res then + log.error("failed to fetch registry from all eureka hosts") return end local json_str = res.body - local data, err = core.json.decode(json_str) + local data, derr = core.json.decode(json_str) if not data then - log.error("invalid response body: ", json_str, " err: ", err) + log.error("invalid response body: ", json_str, " err: ", derr) return end local apps = data.applications.application @@ -185,17 +229,33 @@ local function fetch_full_registry(premature) metadata = metadata, }) if metadata then - -- remove useless data metadata.weight = nil end end end end applications = up_apps + log.info("successfully updated service registry, services count=", + core.table.nkeys(up_apps), "; source=", used_endpoint and used_endpoint.url or "unknown") + if not initial_fetched then + initial_fetched = true + if init_sema then + init_sema:post(1) + end + end end function _M.nodes(service_name) + if not applications then + if init_sema then + local ok, err = init_sema:wait(3) + if not ok then + log.warn("wait eureka initial fetch timeout: ", err) + end + end + end + if not applications then log.error("failed to fetch nodes for : ", service_name) return @@ -205,11 +265,53 @@ function _M.nodes(service_name) end +-- 注释掉文件缓存相关依赖与变量,避免写盘/读盘 +-- local core_io = require("apisix.core.io") +-- local io_open = io.open +-- local cache_file = ngx.config.prefix() .. "logs/eureka_registry.json" +-- local save_registry_cache +-- local load_registry_cache +-- 将文件缓存函数整体注释掉,避免写盘与读盘 +--[[ +local function save_registry_cache(apps) + local body, err = core.json.encode({ applications = apps, ts = ngx.now() }) + if not body then + log.error("encode eureka registry cache failed: ", err) + return + end + local f, ferr = io_open(cache_file, "w") + if not f then + log.error("open eureka registry cache file failed: ", ferr, + ", path: ", cache_file) + return + end + f:write(body) + f:close() +end + +local function load_registry_cache() + local body = core_io.get_file(cache_file) + if not body then + return nil, "no cache file" + end + local data, err = core.json.decode(body) + if not data or not data.applications then + return nil, "invalid cache format: " .. (err or "") + end + return data.applications +end +]] +-- 移除保存缓存调用 +-- save_registry_cache(up_apps) + + function _M.init_worker() default_weight = local_conf.discovery.eureka.weight or 100 log.info("default_weight:", default_weight, ".") local fetch_interval = local_conf.discovery.eureka.fetch_interval or 30 log.info("fetch_interval:", fetch_interval, ".") + init_sema = semaphore.new() + ngx_timer_at(0, fetch_full_registry) ngx_timer_every(fetch_interval, fetch_full_registry) end From d65e14d128403a35ac6cbda68128891b41709b8a Mon Sep 17 00:00:00 2001 From: wisonzhu Date: Fri, 24 Oct 2025 09:56:40 +0800 Subject: [PATCH 2/2] Update init.lua reloading after any node failure in eureka will lose half of the requests --- apisix/discovery/eureka/init.lua | 42 -------------------------------- 1 file changed, 42 deletions(-) diff --git a/apisix/discovery/eureka/init.lua b/apisix/discovery/eureka/init.lua index aa010ec1632c..68c0e43106bb 100644 --- a/apisix/discovery/eureka/init.lua +++ b/apisix/discovery/eureka/init.lua @@ -179,7 +179,6 @@ local function fetch_full_registry(premature) return end - -- 遍历所有 eureka 端点,直到成功 local endpoints = build_endpoints() if not endpoints or #endpoints == 0 then return @@ -264,47 +263,6 @@ function _M.nodes(service_name) return applications[service_name] end - --- 注释掉文件缓存相关依赖与变量,避免写盘/读盘 --- local core_io = require("apisix.core.io") --- local io_open = io.open --- local cache_file = ngx.config.prefix() .. "logs/eureka_registry.json" --- local save_registry_cache --- local load_registry_cache --- 将文件缓存函数整体注释掉,避免写盘与读盘 ---[[ -local function save_registry_cache(apps) - local body, err = core.json.encode({ applications = apps, ts = ngx.now() }) - if not body then - log.error("encode eureka registry cache failed: ", err) - return - end - local f, ferr = io_open(cache_file, "w") - if not f then - log.error("open eureka registry cache file failed: ", ferr, - ", path: ", cache_file) - return - end - f:write(body) - f:close() -end - -local function load_registry_cache() - local body = core_io.get_file(cache_file) - if not body then - return nil, "no cache file" - end - local data, err = core.json.decode(body) - if not data or not data.applications then - return nil, "invalid cache format: " .. (err or "") - end - return data.applications -end -]] --- 移除保存缓存调用 --- save_registry_cache(up_apps) - - function _M.init_worker() default_weight = local_conf.discovery.eureka.weight or 100 log.info("default_weight:", default_weight, ".")