Apache APISIX 完整架构剖析
从启动初始化到请求生命周期、从 Radixtree 路由到四大负载均衡算法,逐层拆解云原生 API 网关的设计精髓
什么是 APISIX
云原生高性能 API 网关
Apache APISIX 是一个基于 OpenResty (Nginx + LuaJIT) 的动态、实时、高性能 API 网关。它使用 etcd 作为配置中心,支持热加载路由、插件、上游等配置,无需重启即可生效。APISIX 提供了 140+ 个插件,涵盖认证鉴权、流量控制、可观测性、协议转换等能力。
核心能力一览
Radixtree 路由
基于基数树的高性能路由匹配,支持 URI、Host、Method、自定义变量等多维度条件
140+ 插件
覆盖认证、限流、可观测性、协议转换等场景,支持 Lua / WASM / 外部插件三种运行时
etcd 配置中心
基于 etcd Watch 机制实时同步配置,毫秒级生效,支持 Standalone YAML 模式
多算法负载均衡
RoundRobin / 一致性 Hash / EWMA / Least Conn 四大算法,集成主被动健康检查
项目结构总览
核心目录结构
apisix/
├── init.lua # ★ 主入口:HTTP/Stream 全部 OpenResty 阶段处理
├── plugin.lua # ★ 插件引擎:加载、排序、过滤、执行、合并
├── router.lua # ★ 路由引擎初始化:选择 radixtree 实现
├── upstream.lua # ★ 上游管理:节点解析、健康检查、TLS
├── balancer.lua # ★ 负载均衡:Picker 模式、重试、Keepalive
├── ssl.lua # SSL 证书管理:加解密、SNI 匹配
├── secret.lua # Secret 管理:Vault/AWS 集成
├── debug.lua # 动态 Debug:函数 Hook、条件日志
├── wasm.lua # WASM 插件支持:Proxy-WASM 兼容层
│
├── core/ # ★ 核心库(31 个模块)
│ ├── config_etcd.lua # etcd Watch 配置同步
│ ├── config_yaml.lua # Standalone YAML 配置
│ ├── config_util.lua # 配置工具函数
│ ├── ctx.lua # 请求上下文 & 变量元表
│ ├── lrucache.lua # LRU 缓存(带版本追踪)
│ ├── request.lua # 请求工具(Header/Body/Args)
│ ├── response.lua # 响应工具(Exit/Header/Body)
│ ├── table.lua # 表操作(deepcopy/merge/patch)
│ ├── json.lua # JSON 编解码(cjson + dkjson)
│ ├── event.lua # 事件发布/订阅
│ └── ... # dns, etcd, log, timer 等
│
├── http/ # HTTP 路由
│ ├── route.lua # 路由编译(URI → Radixtree 条目)
│ └── router/
│ ├── radixtree_uri.lua # URI 路由(默认)
│ ├── radixtree_host_uri.lua # Host + URI 路由
│ └── radixtree_uri_with_parameter.lua # 参数化路由
│
├── ssl/router/
│ └── radixtree_sni.lua # SNI 路由(TLS 证书选择)
│
├── balancer/ # 负载均衡算法
│ ├── roundrobin.lua # 加权轮询
│ ├── chash.lua # 一致性哈希
│ ├── ewma.lua # 指数加权移动平均
│ ├── least_conn.lua # 最小连接数
│ └── priority.lua # 优先级分组
│
├── plugins/ # ★ 140+ 插件实现
│ ├── key-auth.lua, jwt-auth.lua, ... # 认证
│ ├── limit-count.lua, limit-req.lua ... # 限流
│ ├── prometheus.lua, skywalking.lua ... # 可观测性
│ └── ...
│
├── stream/ # L4 TCP/UDP 代理
│ ├── xrpc.lua # xRPC 协议框架
│ ├── router/ip_port.lua# IP:Port 路由
│ └── plugins/ # 流式插件
│
├── admin/ # Admin API
│ ├── init.lua # API 入口 & 认证
│ ├── resource.lua # CRUD 基类
│ ├── routes.lua # 路由管理
│ ├── upstreams.lua # 上游管理
│ └── ... # services, consumers, ssl, ...
│
├── discovery/ # 服务发现
│ ├── init.lua # 发现框架
│ ├── consul/ # Consul
│ ├── nacos/ # Nacos
│ ├── eureka/ # Eureka
│ ├── kubernetes/ # Kubernetes
│ └── dns/ # DNS
│
├── control/ # Control API(运行时内省)
│ └── router.lua # 插件/发现的控制端点
│
└── cli/ # 命令行工具
├── apisix.lua # CLI 入口
└── ops.lua # init/start/stop/reload
整体架构
APISIX 架构全景图
┌─────────────────────────────────┐
│ etcd Cluster │
│ /routes /services /upstreams │
│ /ssl /consumers /global_rules │
└───────────┬─────────────────────┘
│ Watch (gRPC/HTTP)
┌───────────▼─────────────────────┐
│ config_etcd.lua │
│ Watch → produce_res → 订阅者 │
└───────────┬─────────────────────┘
│ 配置变更通知
┌───────────────────────────────▼──────────────────────────────┐
│ APISIX Worker Process │
│ │
│ ┌─── Router ───┐ ┌─── Plugin Engine ───┐ ┌── Balancer ──┐│
│ │ radixtree_uri │ │ load → sort → filter│ │ roundrobin ││
│ │ host_uri │ │ → run_plugin() │ │ chash / ewma ││
│ │ sni (SSL) │ │ 140+ 插件 │ │ least_conn ││
│ └───────┬───────┘ └────────┬────────────┘ └──────┬───────┘│
│ │ │ │ │
│ ┌───────▼───────────────────▼──────────────────────▼───────┐│
│ │ init.lua (请求生命周期) ││
│ │ ssl_phase → access_phase → balancer → filter → log ││
│ └──────────────────────────────────────────────────────────┘│
│ │
│ ┌── Core Libs ──────────────────────┐ ┌── Admin API ─────┐│
│ │ lrucache / tablepool / ctx / json │ │ CRUD / Auth ││
│ │ request / response / event / log │ │ Schema 校验 ││
│ └───────────────────────────────────┘ └──────────────────┘│
└──────────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌─── Upstream ───┐ ┌── Discovery ──┐
│ Backend Services│ │ Consul/Nacos │
│ (HTTP/gRPC/...)│ │ K8s/Eureka/DNS│
└────────────────┘ └───────────────┘
CLI 启动流程
从命令行到 Nginx 进程
APISIX 通过 apisix/cli/apisix.lua 启动,支持 init、start、stop、reload 等命令:
-- apisix/cli/apisix.lua — CLI 入口
-- 1. 设置 Lua 包搜索路径(依赖库、插件目录)
package.cpath = pkg_cpath .. pkg_cpath_org
package.path = pkg_path_deps .. pkg_path_org .. pkg_path_env
-- 2. 加载环境配置
local env = require("apisix.cli.env")(apisix_home, ...)
-- 3. 执行具体操作
local ops = require("apisix.cli.ops")
ops.execute(env, arg)
-- 支持命令:init / start / stop / reload / version / help
Worker 初始化序列
http_init() — Master 进程初始化
-- apisix/init.lua — Nginx master 进程启动时调用
function _M.http_init(args)
core.resolver.init_resolver(args) -- DNS 解析器
core.id.init() -- 实例 ID
core.env.init() -- 环境变量
ngx.process.enable_privileged_agent() -- 特权进程
core.config.init() -- ★ 连接 etcd / 加载配置
xrpc.init() -- xRPC 协议注册
end
http_init_worker() — 每个 Worker 进程初始化
-- apisix/init.lua — 每个 Worker 进程启动时调用
function _M.http_init_worker()
-- 1. 随机数种子(避免多 Worker 生成相同序列)
math.randomseed(ngx.worker.pid() + ngx.now())
-- 2. 核心服务初始化
core.events.init_worker() -- 事件总线
discovery.init_worker() -- ★ 服务发现(Consul/Nacos/K8s...)
balancer.init_worker() -- 负载均衡器
admin.init_worker() -- Admin API
-- 3. 配置同步启动
core.config.init_worker() -- ★ 启动 etcd Watch 线程
-- 4. 插件系统
plugin.init_worker() -- ★ 加载并排序所有插件
-- 5. 路由系统
router.http_init_worker() -- ★ 初始化 HTTP 路由器
-- 内部:加载 radixtree_uri / host_uri 路由实现
-- 内部:加载 radixtree_sni SSL 路由
-- 6. 资源同步(从 etcd 拉取并监听变更)
service.init_worker() -- /services
plugin_config.init_worker() -- /plugin_configs
consumer.init_worker() -- /consumers
consumer_group.init_worker() -- /consumer_groups
secret.init_worker() -- /secrets
global_rules.init_worker() -- /global_rules
upstream.init_worker() -- /upstreams
end
HTTP 请求全链路
一个请求在 APISIX 中的完整旅程
APISIX 的请求处理建立在 OpenResty 的阶段模型之上,每个阶段对应 init.lua 中的一个函数。以下是完整的处理链路:
http_access_phase() — 请求处理主入口
这是整个请求生命周期中最核心的函数(apisix/init.lua:571):
function _M.http_access_phase()
-- ========== 1. 上下文初始化 ==========
local api_ctx = core.tablepool.fetch("api_ctx", 0, 32)
ngx_ctx.api_ctx = api_ctx
core.ctx.set_vars_meta(api_ctx) -- 设置变量元表
-- ========== 2. URI 预处理 ==========
-- 可选:删除尾部斜杠
if local_conf.apisix.delete_uri_tail_slash then ... end
-- 可选:Servlet 风格 URI 规范化
if local_conf.apisix.normalize_uri_like_servlet then ... end
-- ========== 3. 路由匹配 ==========
router.router_http.match(api_ctx)
local route = api_ctx.matched_route
if not route then
-- 即使 404 也执行全局规则(prometheus 等可统计 404 请求)
plugin.run_global_rules(api_ctx, global_rules, nil)
return core.response.exit(404, {error_msg = "404 Route Not Found"})
end
-- ========== 4. 配置合并 ==========
-- 4a. Plugin Config 合并(补充 Route 未配置的插件)
if route.value.plugin_config_id then
route = plugin_config.merge(route, conf)
end
-- 4b. Service 合并(Route 插件覆盖 Service 同名插件)
if route.value.service_id then
route = plugin.merge_service_route(service, route)
end
-- ========== 5. 全局规则执行 ==========
plugin.run_global_rules(api_ctx, global_rules, nil)
-- 内部:遍历每条 global_rule → filter → run_plugin("rewrite") + run_plugin("access")
-- ========== 6. 路由插件 Rewrite 阶段 ==========
local plugins = plugin.filter(api_ctx, route) -- 按 priority 降序过滤
plugin.run_plugin("rewrite", plugins, api_ctx)
-- ========== 7. Consumer 识别与合并 ==========
if api_ctx.consumer then
-- 认证插件在 rewrite 阶段识别出 consumer
route, changed = plugin.merge_consumer_route(route, consumer, group_conf)
if changed then
-- 重新过滤 + 执行 consumer 新增插件的 rewrite
plugin.filter(api_ctx, route, plugins, nil, "rewrite_in_consumer")
plugin.run_plugin("rewrite_in_consumer", plugins, api_ctx)
end
end
-- ========== 8. 路由插件 Access 阶段 ==========
plugin.run_plugin("access", plugins, api_ctx)
-- ========== 9. 转发到上游 ==========
_M.handle_upstream(api_ctx, route, enable_websocket)
end
handle_upstream() — 上游处理
-- apisix/init.lua:463
function _M.handle_upstream(api_ctx, route, enable_websocket)
-- 1. 解析上游配置(by ID 或 inline)
local up_id = route.value.upstream_id
if up_id then
local upstream = apisix_upstream.get_by_id(up_id)
-- 设置到 api_ctx
end
-- 2. 服务发现节点解析(如果配置了 service_name)
apisix_upstream.set_by_route(route, api_ctx)
-- 3. 上游 TLS 配置(mTLS 到后端)
-- 设置客户端证书/密钥
-- 4. WebSocket 升级
if enable_websocket then
api_ctx.var.upstream_upgrade = ctx.var.http_upgrade
api_ctx.var.upstream_connection = ctx.var.http_connection
end
-- 5. ★ 负载均衡选择服务器
local server, err = load_balancer.pick_server(route, api_ctx)
-- 6. 设置上游请求头
set_upstream_headers(api_ctx, server)
-- 7. 执行 before_proxy 插件阶段
plugin.run_plugin("before_proxy", plugins, api_ctx)
-- 8. 协议路由
if api_ctx.upstream_scheme == "grpc" then
ngx.exec("@grpc_pass")
elseif api_ctx.upstream_scheme == "dubbo" then
ngx.exec("@dubbo_pass")
else
-- 标准 HTTP 代理(Nginx proxy_pass)
end
end
api_ctx 核心数据结构
贯穿请求全生命周期的上下文对象
api_ctx 通过 TablePool 分配,每个请求独占一个实例,在所有阶段之间传递状态:
api_ctx = {
-- 请求变量(通过 Lua 元表懒加载)
var = {
uri, host, remote_addr, method, ...
-- 支持 cookie_*, arg_*, http_*, graphql_*, uri_param_* 前缀
},
-- 路由匹配结果
matched_route = route_obj, -- 匹配的路由对象
matched_upstream = upstream_obj, -- 解析后的上游配置
matched_ssl = ssl_obj, -- 匹配的 SSL 证书
curr_req_matched = {}, -- 实际匹配的 host/uri
-- 插件状态
plugins = {obj, conf, obj, conf, ...}, -- 扁平数组
plugin_timing = {["key-auth"] = {rewrite = 0.001}}, -- 执行计时
_plugin_name = "current_plugin", -- 当前执行的插件名
-- Consumer 信息(认证后填充)
consumer = {username, plugins, group_id, ...},
consumer_name = "user_A",
consumer_ver = "version",
-- 上游信息
upstream_conf = {}, -- 上游完整配置
upstream_scheme = "http", -- http/https/grpc/grpcs/kafka
upstream_key = "...", -- 负载均衡缓存 key
upstream_version = "...", -- 配置版本
-- 负载均衡结果
picked_server = {host, port}, -- 选中的后端节点
balancer_ip = "10.0.0.1",
balancer_port = 8080,
up_checker = checker_obj, -- 健康检查器
-- 配置元信息
conf_type = "route&service&consumer",
conf_version = "v1#v2#v3",
conf_id = "route_1#svc_1#user_A",
-- 全局规则引用(供后续阶段使用)
global_rules = global_rules_list,
}
响应阶段与 common_phase
common_phase:统一的阶段调度器
除了 access 阶段外,其余阶段都通过 common_phase() 统一调度(init.lua:445):
local function common_phase(phase_name)
local api_ctx = ngx.ctx.api_ctx
-- ★ 先执行全局规则中该阶段的插件
plugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name)
-- ★ 再执行路由插件中该阶段的插件
return plugin.run_plugin(phase_name, nil, api_ctx)
end
-- header_filter 阶段:修改响应头
function _M.http_header_filter_phase()
core.response.set_header("Server", ver_header)
common_phase("header_filter")
end
-- body_filter 阶段:修改响应体
function _M.http_body_filter_phase()
common_phase("body_filter")
common_phase("delayed_body_filter")
end
-- log 阶段:日志记录 + 资源清理
function _M.http_log_phase()
local api_ctx = common_phase("log")
healthcheck_passive(api_ctx) -- 被动健康检查上报
core.ctx.release_vars(api_ctx) -- 释放变量
core.tablepool.release("plugins", api_ctx.plugins) -- 回收插件表
core.tablepool.release("api_ctx", api_ctx) -- 回收上下文
end
etcd 配置同步
基于 Watch 的实时配置同步
APISIX 使用 etcd 的 Watch API 实现配置的实时同步,变更可在毫秒级生效。核心逻辑在 apisix/core/config_etcd.lua。
Watch 线程架构
-- config_etcd.lua — 每个配置 key 一个 Watch 订阅者
-- 1. 创建配置对象(每种资源一个)
local routes_conf = core.config.new("/routes", {
automatic = true, -- 自动同步
item_schema = schema, -- JSON Schema 校验
filter = function(item) -- 变更回调
rebuild_router()
end,
})
-- 2. Watch 线程(全局共享,单线程监听所有 key)
local function run_watch()
-- 连接 etcd
local cli = etcd_cli.new(...)
-- 开始 Watch(从当前 revision 开始)
local res_func = cli:watchdir(prefix, {start_revision = rev})
while true do
local res = res_func() -- 阻塞等待变更事件
if res.result.created then
-- Watch 流创建成功
elseif res.result.canceled then
-- 处理 etcd compaction(触发全量重载)
else
-- ★ 发布变更事件给所有订阅者
produce_res(res)
end
end
end
-- 3. 事件发布(唤醒所有等待的订阅者)
local function produce_res(res, err)
insert_tab(watch_ctx.res, {res=res, err=err}) -- 写入事件队列
for _, sema in pairs(watch_ctx.sema) do
sema:post() -- ★ 唤醒等待的协程
end
end
同步数据处理
-- sync_data() — 处理 Watch 事件,更新本地数据
local function sync_data(self)
-- 从事件队列消费变更
local res = watch_ctx.res[self.idx]
for _, event in ipairs(res.result.events) do
local kv = event.kv
local key = kv.key -- e.g., "/apisix/routes/1"
if event.type == "DELETE" then
-- 从 self.values 中删除
fire_all_clean_handlers(item) -- 触发清理回调(健康检查器等)
else
-- PUT:JSON 解码 + Schema 校验
local item = core.json.decode(kv.value)
local ok, err = self.item_schema(item)
-- 更新到 self.values
self.values[index] = {
value = item,
modifiedIndex = kv.mod_revision,
key = key,
}
-- ★ 触发 filter 回调(如路由器重建)
if self.filter then
self.filter(item)
end
end
end
-- 更新配置版本号
self.conf_version = self.conf_version + 1
end
conf_version 自增。路由器、LRU 缓存等依赖此版本号判断是否需要重建/失效。这避免了不必要的计算开销。
Standalone YAML 模式
无 etcd 依赖的独立运行模式
-- config_yaml.lua — 文件轮询式配置加载
-- 1. 每秒检查文件修改时间
local function read_apisix_yaml(premature, pre_mtime)
local attributes = lfs.attributes(apisix_yaml)
if attributes.modification == pre_mtime then
return -- 文件未变,跳过
end
local content = io_open(apisix_yaml):read("*a")
-- ★ 必须以 #END 结尾(防止读到写了一半的文件)
if not content:find("#END") then
return -- 文件不完整,跳过
end
-- 解析 YAML + 环境变量替换
local conf = yaml.parse(content)
file.resolve_conf_var(conf)
-- 通知订阅者
apisix_yaml_ctime = attributes.modification
end
-- 2. 同步数据
local function sync_data(self)
-- 将 YAML 中对应 key 的数据转为内部格式
-- 使用文件修改时间作为版本号
item.modifiedIndex = apisix_yaml_ctime
end
apisix.yaml 文件并确保以 #END 结尾。
Router 路由引擎
可插拔的路由器架构
APISIX 支持多种路由实现,通过配置选择(apisix/router.lua):
-- router.lua — 路由器初始化
function _M.http_init_worker()
local conf = core.config.local_conf()
-- ★ 从配置中选择路由实现
local router_http_name = conf.apisix.router.http or "radixtree_uri"
local router_ssl_name = conf.apisix.router.ssl or "radixtree_sni"
-- 加载 HTTP 路由器
local router_http = require("apisix.http.router." .. router_http_name)
attach_http_router_common_methods(router_http)
router_http.init_worker(filter)
_M.router_http = router_http
-- 加载 SSL 路由器
local router_ssl = require("apisix.ssl.router." .. router_ssl_name)
router_ssl.init_worker()
_M.router_ssl = router_ssl
end
| 路由器 | 匹配维度 | 适用场景 |
|---|---|---|
radixtree_uri |
URI 路径 | 默认选择,大部分场景 |
radixtree_host_uri |
Host + URI | 多域名共享同一 APISIX 实例 |
radixtree_uri_with_parameter |
URI + 路径参数 | RESTful API(/users/:id) |
radixtree_sni |
TLS SNI | SSL 证书选择(始终使用) |
Radixtree 匹配算法
路由匹配的核心流程
radixtree_uri.lua 是默认的路由实现,基于 resty.radixtree 库的高性能前缀树匹配:
-- apisix/http/router/radixtree_uri.lua
function _M.match(api_ctx)
-- ★ 配置变更时重建路由树(惰性重建)
if not cached_router_version
or cached_router_version ~= user_routes.conf_version then
uri_router = base_router.create_radixtree_uri_router(
user_routes.values, uri_routes, false
)
cached_router_version = user_routes.conf_version
end
return _M.matching(api_ctx)
end
function _M.matching(api_ctx)
-- 构造匹配参数
local match_opts = core.tablepool.fetch("route_match_opts", 0, 16)
match_opts.method = api_ctx.var.request_method
match_opts.host = api_ctx.var.host
match_opts.remote_addr = api_ctx.var.remote_addr
match_opts.vars = api_ctx.var
-- ★ Radixtree 分派
local ok = uri_router:dispatch(api_ctx.var.uri, match_opts, api_ctx)
core.tablepool.release("route_match_opts", match_opts)
return ok
end
Host + URI 路由的特殊处理
-- radixtree_host_uri.lua — 两级路由匹配
function _M.matching(api_ctx)
-- ★ 反转域名用于前缀树匹配
-- "api.example.com" → "moc.elpmaxe.ipa"
-- 这样 *.example.com 可以用前缀匹配
local host_uri = host:reverse() .. "/" .. api_ctx.var.uri
-- 先尝试 Host 特定的路由树
if host_routes[host] then
local ok = host_routes[host]:dispatch(host_uri, match_opts, api_ctx)
if ok then return ok end
end
-- ★ 回退到纯 URI 路由
if only_uri_router then
return only_uri_router:dispatch(api_ctx.var.uri, match_opts, api_ctx)
end
end
*.example.com 变成 moc.elpmaxe.*,可以用前缀树高效匹配通配符域名。这个技巧也用于 SNI 路由。
路由编译与条件匹配
从 Route 配置到 Radixtree 条目
-- apisix/http/route.lua:34 — 路由编译
function _M.create_radixtree_uri_router(routes, uri_routes, with_parameter)
for _, route in ipairs(routes) do
if route.value.status ~= 0 then -- 跳过禁用的路由
-- 编译 filter_func(Lua 代码字符串)
local filter_fun = nil
if route.value.filter_func then
filter_fun = loadstring(
"return " .. route.value.filter_func,
"router#" .. route.value.id
)
end
-- ★ 构建 Radixtree 条目
core.table.insert(uri_routes, {
paths = route.value.uris or route.value.uri,
methods = route.value.methods,
priority = route.value.priority, -- 路由优先级
hosts = route.value.hosts,
remote_addrs = route.value.remote_addrs, -- CIDR 匹配
vars = route.value.vars, -- ★ 复杂条件表达式
filter_fun = filter_fun, -- ★ 自定义过滤函数
handler = function(api_ctx, match_opts)
api_ctx.matched_route = route
api_ctx.curr_req_matched = match_opts.matched
end
})
end
end
return radixtree.new(uri_routes)
end
vars 条件表达式
// 路由配置中的 vars 字段:复杂条件匹配
{
"uri": "/api/*",
"vars": [
["arg_version", "==", "v2"],
["http_x-tenant", "~~", "^(gold|platinum)$"]
]
}
// 含义:查询参数 version=v2 且请求头 X-Tenant 匹配 gold 或 platinum
// 支持的操作符:
// ==, ~=, >, >=, <, <= — 比较
// ~~ — 正则匹配
// in, has — 集合操作
// ! — 取反
Upstream 上游管理
上游的完整数据模型
-- apisix/upstream.lua — 上游配置结构
upstream = {
id = "unique_id",
type = "roundrobin", -- roundrobin / chash / ewma / least_conn
-- ★ 节点列表
nodes = {
{host = "10.0.0.1", port = 8080, weight = 100, priority = 0},
{host = "10.0.0.2", port = 8080, weight = 50, priority = 0},
{host = "backup.svc", port = 8080, weight = 100, priority = -1},
},
-- 或通过服务发现
service_name = "my-service",
discovery_type = "consul",
-- 超时配置
timeout = {connect = 6, send = 6, read = 6},
-- 连接池
keepalive_pool = {
size = 32, -- 连接池大小
requests = 100, -- 每连接最大请求数
idle_timeout = 60, -- 空闲超时
},
-- ★ 健康检查
checks = {
active = {
type = "http",
http_path = "/health",
healthy = {interval = 1, successes = 2},
unhealthy = {interval = 1, http_statuses = {500, 502, 503}},
},
passive = {
healthy = {http_statuses = {200, 302}, successes = 5},
unhealthy = {http_statuses = {500, 502}, tcp_failures = 2},
}
},
-- 上游 mTLS
tls = {client_cert_id = "cert_1"},
-- 重试
retries = 3,
retry_timeout = 10,
}
节点自动补全
-- upstream.lua:200 — fill_node_info()
-- 自动填充缺失的 port 和 priority
local function fill_node_info(up_conf, scheme)
for _, node in ipairs(up_conf.nodes) do
if not node.port then
-- 根据 scheme 自动推断端口
if scheme == "https" or scheme == "grpcs" then
node.port = 443
else
node.port = 80
end
end
if not node.priority then
node.priority = 0 -- 默认优先级
end
end
end
Balancer 负载均衡
Picker 模式:可插拔的均衡器
APISIX 的负载均衡采用 Picker 模式:每种算法是一个独立模块,实现统一的 new() 和 get() 接口(apisix/balancer.lua):
-- balancer.lua:98 — 创建 Picker 实例
local function create_server_picker(upstream, checker)
-- 动态加载负载均衡模块
local picker = pickers[upstream.type]
if not picker then
pickers[upstream.type] = require("apisix.balancer." .. upstream.type)
picker = pickers[upstream.type]
end
-- ★ 获取健康节点
local up_nodes = fetch_health_nodes(upstream, checker)
-- ★ 如果有多优先级组,使用优先级均衡器包装
if #up_nodes._priority_index > 1 then
return priority_balancer.new(up_nodes, upstream, picker)
else
return picker.new(up_nodes[up_nodes._priority_index[1]], upstream)
end
end
-- balancer.lua:194 — 选择服务器
local function pick_server(route, ctx)
-- ★ LRU 缓存 Picker(按 upstream_key + version)
local server_picker = lrucache_server_picker(
ctx.upstream_key, ctx.upstream_version,
create_server_picker, upstream, checker
)
-- ★ 选择节点
local server, err = server_picker.get(ctx)
ctx.balancer_ip = server.host
ctx.balancer_port = server.port
return server
end
Balancer Phase:重试与故障转移
-- balancer.lua:342 — Nginx balancer_by_lua 阶段
function _M.run(route, ctx, plugin_funcs)
if ctx.picked_server then
-- ★ 首次:使用 access 阶段选好的服务器
server = ctx.picked_server
ctx.picked_server = nil
set_balancer_opts(route, ctx) -- 设置超时和重试次数
else
-- ★ 重试:上游失败后重新选择
if ctx.proxy_retry_deadline and ctx.proxy_retry_deadline < ngx_now() then
return core.response.exit(502) -- 重试超时
end
-- 上报失败到健康检查器
if ctx.balancer_ip then
checker:report_http_status(ctx.balancer_ip, ctx.balancer_port, ...)
end
-- 重新选择节点
server = pick_server(route, ctx)
-- 重新执行 before_proxy 插件
plugin_funcs("before_proxy")
end
-- ★ 设置当前上游节点
set_current_peer(server, ctx)
ctx.proxy_passed = true
end
四大均衡算法源码
1. RoundRobin 加权轮询
-- balancer/roundrobin.lua — 基于 resty.roundrobin 库
function _M.new(up_nodes, upstream)
local picker = rr_obj:new(up_nodes) -- {addr = weight, ...}
return {
get = function(ctx)
-- 选择下一个节点,跳过已尝试的
local server = picker:find()
while tried[server] do
server = picker:find()
end
return parse_server(server)
end,
after_balance = function(ctx, before_retry)
if before_retry then
tried[last_server] = true -- 记录失败节点
else
tried = {} -- 成功后清空
end
end
}
end
2. Consistent Hash 一致性哈希
-- balancer/chash.lua — 基于 resty.chash 库
function _M.new(up_nodes, upstream)
-- 计算 GCD 归一化权重
local gcd = gcd_calc(weights)
local chash = resty_chash:new(normalized_nodes)
return {
get = function(ctx)
-- ★ 根据配置的 key 计算哈希值
local hash_key
if upstream.hash_on == "consumer" then
hash_key = ctx.consumer_name
elseif upstream.hash_on == "header" then
hash_key = ctx.var["http_" .. upstream.key]
elseif upstream.hash_on == "cookie" then
hash_key = ctx.var["cookie_" .. upstream.key]
elseif upstream.hash_on == "vars" then
hash_key = ctx.var[upstream.key]
end
-- 回退到 remote_addr
hash_key = hash_key or ctx.var.remote_addr
return chash:find(hash_key)
end
}
end
3. EWMA 指数加权移动平均
-- balancer/ewma.lua — 基于响应时间的自适应均衡
-- 核心公式:score = ewma(RTT) * (1 + active_connections)
-- 衰减窗口:10 秒
local DECAY_TIME = 10 -- 秒
local function get_or_update_ewma(upstream, rtt, old)
local now = ngx.now()
local last_touched_at = shm:get(key .. "_last_touched")
local td = now - last_touched_at
-- ★ 指数衰减:td 越大,旧值权重越低
local weight = math.exp(-td / DECAY_TIME)
local new_ewma = old * weight + rtt * (1 - weight)
shm:set(key, new_ewma)
shm:set(key .. "_last_touched", now)
return new_ewma
end
function get(ctx)
-- ★ 随机选 2 个节点,取分数低的(P2C 策略)
local a, b = random_pick_two(nodes)
local score_a = ewma[a] * (1 + get_connections(a))
local score_b = ewma[b] * (1 + get_connections(b))
return score_a < score_b and a or b
end
4. Least Conn 最小连接数
-- balancer/least_conn.lua — 基于二叉堆的最小连接数均衡
function _M.new(up_nodes, upstream)
-- ★ 构建最小堆,score = connections / weight
local heap = binary_heap.new()
for addr, weight in pairs(up_nodes) do
local score = (1 / weight) * 0 -- 初始 0 连接
heap:push({addr = addr, weight = weight, score = score})
end
return {
get = function(ctx)
-- ★ 弹出分数最低的节点
local entry = heap:pop()
entry.score = entry.score + (1 / entry.weight) -- 连接数+1
heap:push(entry) -- 重新入堆
return entry.addr
end,
after_balance = function(ctx, before_retry)
-- 释放连接:分数 - (1/weight)
entry.score = entry.score - (1 / entry.weight)
end
}
end
健康检查机制
主动 + 被动双模健康检查
-- upstream.lua — 健康检查器创建
local function create_checker(upstream)
local checker = healthcheck.new({
name = "upstream#" .. upstream.key,
shm_name = "upstream-healthcheck", -- 共享内存
checks = upstream.checks,
})
-- ★ 注册所有节点为检查目标
for _, node in ipairs(upstream.nodes) do
checker:add_target(node.host, node.port, host_header, true)
end
return checker
end
-- ★ 健康节点过滤
local function fetch_health_nodes(upstream, checker)
if not checker then
return all_nodes -- 无健康检查,返回全部
end
local up_nodes = {}
for _, node in ipairs(nodes) do
local ok = checker:get_target_status(node.host, node.port)
if ok then
up_nodes[addr] = weight
end
end
-- ★ 全部不健康时,回退到全部节点(避免完全不可用)
if is_empty(up_nodes) then
return all_nodes
end
return up_nodes
end
-- ★ 被动健康检查上报(log 阶段)
local function healthcheck_passive(api_ctx)
local checker = api_ctx.up_checker
local resp_status = ngx.status
-- 上报 HTTP 状态码给检查器
checker:report_http_status(
api_ctx.balancer_ip, api_ctx.balancer_port,
host, resp_status
)
end
服务发现框架
可插拔的服务发现接口
-- discovery/init.lua — 服务发现框架
local discovery = {}
function _M.init_worker()
local discovery_type = local_conf.discovery
for name, _ in pairs(discovery_type) do
discovery[name] = require("apisix.discovery." .. name)
discovery[name].init_worker()
end
end
-- 统一接口:返回服务节点列表
-- discovery[type].nodes(service_name) → [{host, port, weight}, ...]
| 发现类型 | 数据源 | 同步方式 |
|---|---|---|
consul | Consul Catalog API | Long Polling(阻塞查询 + Index 追踪) |
nacos | Nacos Naming API | 定时轮询 |
eureka | Eureka REST API | 定时拉取 |
kubernetes | K8s Service/Endpoints | Watch API |
dns | DNS A/SRV 记录 | TTL 过期刷新 |
SSL 证书管理
证书加密存储与动态加载
-- ssl.lua — 证书管理
-- 1. 证书缓存
local cert_cache = core.lrucache.new({ttl = 3600, count = 1024})
local pkey_cache = core.lrucache.new({ttl = 3600, count = 1024})
-- 2. 密钥加密存储(AES-128-CBC + Base64)
function _M.aes_encrypt_pkey(origin, field)
local aes = get_aes_128_cbc_with_iv(local_conf)
local encrypted = aes:encrypt(origin)
return ngx_encode_base64(encrypted)
end
-- 3. 解密时尝试多个密钥环条目
function _M.aes_decrypt_pkey(origin, field)
for _, key in ipairs(keyring) do
local aes = aes_128_cbc:new(key, nil, iv)
local decrypted = aes:decrypt(ngx_decode_base64(origin))
if decrypted then return decrypted end
end
end
-- 4. 证书验证
function _M.validate(cert, key)
local parsed_cert = ngx_ssl.parse_pem_cert(cert)
local parsed_key = ngx_ssl.parse_pem_priv_key(key)
return parsed_cert and parsed_key
end
SNI 路由匹配
TLS 握手阶段的证书选择
-- ssl/router/radixtree_sni.lua — SNI 路由
local function create_router(ssl_items)
local route_items = {}
for _, ssl in config_util.iterate_values(ssl_items) do
if ssl.value.type == "server" and ssl.value.status ~= 0 then
-- ★ 反转 SNI 用于前缀树匹配
local sni = {}
for _, s in ipairs(ssl.value.snis) do
table.insert(sni, s:reverse())
-- "*.example.com" → "moc.elpmaxe.*"
end
core.table.insert(route_items, {
paths = sni,
handler = function(api_ctx)
api_ctx.matched_ssl = ssl
end
})
end
end
return radixtree.new(route_items)
end
-- 在 TLS ClientHello 阶段调用:
-- http_ssl_client_hello_phase() → radixtree_sni.match() → 设置证书
Core 核心模块总览
core.lua — 门面模式聚合所有核心服务
-- apisix/core.lua — 统一入口
return {
version = version,
log = require("apisix.core.log"),
config = require("apisix.core.config_" .. config_provider), -- etcd/yaml/xds
json = require("apisix.core.json"),
table = require("apisix.core.table"),
request = require("apisix.core.request"),
response = require("apisix.core.response"),
lrucache = require("apisix.core.lrucache"),
ctx = require("apisix.core.ctx"),
tablepool = require("tablepool"), -- OpenResty 表池
dns_client= require("apisix.core.dns.client"),
schema = require("apisix.core.schema"),
etcd = require("apisix.core.etcd"),
event = require("apisix.core.event"),
-- ...
}
-- 配置提供者选择
local config_provider = local_conf.deployment.config_provider or "etcd"
-- 支持:etcd / yaml / xds
LRU Cache 缓存系统
带版本追踪的多级缓存
-- core/lrucache.lua — 核心缓存机制
-- 全局级缓存:1024 条目,60 分钟 TTL
GLOBAL_ITEMS_COUNT = 1024
GLOBAL_TTL = 60 * 60
-- 插件级缓存:8 条目,5 分钟 TTL
PLUGIN_ITEMS_COUNT = 8
PLUGIN_TTL = 5 * 60
-- ★ 核心接口:key + version → 缓存或创建
function new_lru_fun(opts)
local lru_obj = lru_new(opts.count)
return function(key, version, create_obj_fun, ...)
-- 1. 查缓存
local cache_obj = lru_obj:get(key)
if cache_obj and cache_obj.ver == version then
return cache_obj.val -- ★ 版本匹配,命中缓存
end
-- 2. 未命中:调用创建函数
local obj, err = create_obj_fun(...)
-- 3. 写入缓存(带版本号)
if obj ~= nil then
lru_obj:set(key, {val = obj, ver = version}, ttl)
end
return obj, err
end
end
-- 使用示例:
local merged_route = core.lrucache.new({ttl = 300, count = 512})
-- key = route_id#service_version, version = service.modifiedIndex
-- 配置变更时 version 变化 → 缓存自动失效 → 重新合并
local result = merged_route(route_key, service_version,
merge_service_route, service_conf, route_conf)
version 参数判断缓存是否有效。etcd 的 modifiedIndex 天然适合作为版本号 — 每次配置变更 Index 递增,缓存自动失效。
ctx.var 变量元表
Lua 元表实现的惰性变量解析
-- core/ctx.lua — 变量访问的核心机制
-- ★ 通过 __index 元方法实现按需计算
local mt = {
__index = function(t, key)
-- 先查缓存
local cached = t._cache[key]
if cached ~= nil then return cached end
-- ★ 按前缀路由到不同数据源
if key:sub(1, 7) == "cookie_" then
-- 从 Cookie 中解析
val = parse_cookie(key:sub(8))
elseif key:sub(1, 4) == "arg_" then
-- 从查询参数中获取
val = ngx.var["arg_" .. key:sub(5)]
elseif key:sub(1, 5) == "http_" then
-- 从请求头获取
val = ngx.var[key]
elseif key:sub(1, 9) == "graphql_" then
-- ★ 解析 GraphQL 查询体
val = parse_graphql(key:sub(10))
elseif key:sub(1, 10) == "uri_param_" then
-- 从路径参数获取(radixtree_uri_with_parameter)
val = api_ctx.curr_req_matched[key:sub(11)]
else
-- 回退到 ngx.var
val = ngx.var[key]
end
t._cache[key] = val -- 缓存结果
return val
end
}
-- ★ 支持自定义变量注册
function _M.register_var(name, opts, func)
-- 插件可以注册自定义变量
-- 如 prometheus 注册 apisix_route_name 变量
end
TablePool 对象池
减少 GC 压力的关键优化
-- APISIX 大量使用 OpenResty 的 tablepool 复用 Lua table
-- 分配(从池中取或新建)
local api_ctx = core.tablepool.fetch("api_ctx", 0, 32)
-- "api_ctx" = 池名,0 = 数组部分大小,32 = 哈希部分大小
-- 使用...
api_ctx.matched_route = route
api_ctx.plugins = plugins
-- 回收(清空后放回池中)
core.tablepool.release("api_ctx", api_ctx)
-- ★ 主要使用场景:
-- "api_ctx" — 请求上下文(每请求一个)
-- "plugins" — 插件列表(每请求一个)
-- "route_match_opts" — 路由匹配参数
-- "tmp_plugin_objs" — 插件排序临时表
Admin API 架构
RESTful 资源管理
-- admin/init.lua — Admin API 入口
-- 支持的资源
local resources = {
routes = require("apisix.admin.routes"),
services = require("apisix.admin.services"),
upstreams = require("apisix.admin.upstreams"),
consumers = require("apisix.admin.consumers"),
ssls = require("apisix.admin.ssl"),
plugins = require("apisix.admin.plugins"),
global_rules = require("apisix.admin.global_rules"),
plugin_configs = require("apisix.admin.plugin_config"),
consumer_groups = require("apisix.admin.consumer_group"),
secrets = require("apisix.admin.secrets"),
-- ...
}
-- 请求处理
function run()
-- 1. 认证:检查 API Key
local ok = check_token(ctx)
-- 支持 X-API-KEY header / api_key query / cookie
-- 2. 解析 URI: /apisix/admin/{resource}/{id}/{sub_path}
local resource = resources[seg_res]
-- 3. 分派到对应方法
local code, body = resource[method](resource, seg_id, req_body, ...)
-- GET / POST / PUT / DELETE / PATCH
end
CRUD 资源管理
resource.lua — 通用 CRUD 基类
-- admin/resource.lua — 所有资源的基类
-- PUT: 创建或更新
function _M:put(id, conf, sub_path, args)
-- 1. Schema 校验
local ok, err = self.checker(id, conf, true)
-- 2. 写入 etcd
local res, err = core.etcd.set(key, conf, args.ttl)
return 200, {key = key, value = conf}
end
-- DELETE: 删除(带引用完整性检查)
function _M:delete(id)
-- 1. 检查是否被其他资源引用
if self.delete_checker then
local ok, err = self.delete_checker(id)
-- 如:删除 upstream 前检查是否有 route/service 引用它
end
-- 2. 从 etcd 删除
local res, err = core.etcd.delete(key)
end
-- PATCH: 局部更新(支持嵌套路径)
function _M:patch(id, conf, sub_path)
-- 1. 获取当前配置
local old_conf = core.etcd.get(key)
-- 2. 合并变更(支持 /plugins/limit-count 这样的子路径)
core.table.patch(old_conf, sub_path, conf)
-- 3. 写回 etcd
core.etcd.set(key, old_conf)
end
Stream L4 代理
TCP/UDP 四层代理
-- init.lua — Stream 子系统处理
-- L4 路由匹配:基于 IP:Port
function _M.stream_preread_phase()
local api_ctx = core.tablepool.fetch("api_ctx", 0, 32)
-- IP:Port 路由匹配
router.router_stream.match(api_ctx)
-- 解析上游
local up_id = matched_route.value.upstream_id
api_ctx.matched_upstream = apisix_upstream.get_by_id(up_id)
-- 流式插件执行
local plugins = plugin.stream_filter(matched_route)
plugin.run_plugin("preread", plugins, api_ctx)
-- ★ xRPC 协议支持(Redis、Dubbo 等)
if matched_route.value.protocol then
xrpc.run_protocol(matched_route.value.protocol, api_ctx)
end
end
xRPC 协议框架
-- stream/xrpc.lua — 可扩展的 RPC 协议支持
local registered_protocols = {}
function _M.register_protocol(name, is_http)
registered_protocols[name] =
require("apisix.stream.xrpc.protocols." .. name)
end
-- 支持的协议:
-- Redis Protocol → stream/xrpc/protocols/redis/
-- Dubbo Protocol → stream/xrpc/protocols/dubbo/
WASM 插件支持
Proxy-WASM 兼容层
-- wasm.lua — WebAssembly 插件运行时
-- 加载 WASM 二进制
function _M.require(attrs)
wasm.load(attrs.name, attrs.file)
end
-- 执行阶段映射:
-- WASM on_http_request_headers → rewrite 或 access 阶段
-- WASM on_http_request_body → 按需执行
-- WASM on_http_response_headers → header_filter 阶段
-- WASM on_http_response_body → body_filter 阶段
-- 配置传递:JSON 序列化后传入 WASM 运行时
local json_conf = core.json.encode(plugin_conf)
wasm.on_configure(plugin_ctx, json_conf)
Secret 密钥管理
外部密钥管理器集成
-- secret.lua — 密钥引用与解析
-- ★ Secret URI 格式:
-- $secret://vault/kv-store/database_password
-- $secret://aws/my-secrets/api_key
-- 解析流程
function fetch_secrets(refs, cache_enabled, key, version)
local function retrieve(refs)
for k, v in pairs(refs) do
if type(v) == "string" and v:sub(1, #PREFIX) == PREFIX then
-- 解析 URI → manager / conf_id / secret_key
local manager, conf_id, secret_key = parse_uri(v)
-- 从密钥管理器获取真实值
refs[k] = managers[manager].get(conf, secret_key)
elseif type(v) == "table" then
retrieve(v) -- 递归处理嵌套
end
end
end
-- ★ 带缓存的密钥获取
if cache_enabled then
return secrets_lrucache(key, version, retrieve, refs)
end
return retrieve(refs)
end
-- 支持的管理器:HashiCorp Vault / AWS Secrets Manager
动态 Debug 模式
运行时函数 Hook 系统
-- debug.lua — 动态调试(无需重启)
-- 配置文件:conf/debug.yaml
-- basic:
-- enable: true
-- http_filter:
-- enable: true
-- enable_header_name: "X-Debug" ← 请求头触发调试
-- hook_conf:
-- enable: true
-- is_print_input_args: true
-- is_print_return_value: true
-- hooks:
-- apisix.balancer:
-- - pick_server
-- apisix.plugin:
-- - run_plugin
-- ★ Hook 机制:用 Lua 元表包装目标函数
local function apply_new_fun(module, fun_name, file_path, hook_conf)
local fun_org = module[fun_name]
local t = {fun_org = fun_org}
local mt = {}
function mt.__call(self, ...)
-- 打印入参
if hook_conf.is_print_input_args then
log.warn("call ", fun_name, "() args:", inspect({...}))
end
-- 调用原始函数
local ret = {self.fun_org(...)}
-- 打印返回值
if hook_conf.is_print_return_value then
log.warn("call ", fun_name, "() return:", inspect(ret))
end
return unpack(ret)
end
setmetatable(t, mt)
module[fun_name] = t -- ★ 替换原函数
end
-- 每秒检查 debug.yaml 变更,热加载 Hook 配置
核心设计模式
APISIX 源码中的设计精华
元表驱动的惰性求值
ctx.var 通过 __index 元方法实现按需计算变量,避免预计算所有变量的开销。Cookie、GraphQL 查询等昂贵操作只在真正访问时才执行
版本追踪的 LRU 缓存
不依赖 TTL 过期,而是通过 etcd modifiedIndex 作为版本号。配置变更时版本递增,缓存自动失效。兼顾了实时性和性能
TablePool 对象复用
高频创建的 api_ctx、plugins 等 table 通过对象池复用,避免 GC 抖动。这是 APISIX 在高并发下保持低延迟的关键
惰性路由重建
路由树不在配置变更时立即重建,而是在下一次请求匹配时检查版本号,按需重建。避免了频繁配置变更导致的无效重建
域名反转前缀匹配
将域名反转后用 Radixtree 前缀树匹配,巧妙地将通配符域名(*.example.com)转化为前缀匹配问题
扁平数组插件存储
插件列表用 [obj, conf, obj, conf, ...] 扁平数组存储,步长为 2 遍历。避免了为每个插件创建额外的包装对象,减少内存分配
总结
APISIX 的核心文件速查
| 文件 | 职责 | 关键函数 |
|---|---|---|
init.lua |
请求生命周期 | http_access_phase, handle_upstream, common_phase |
plugin.lua |
插件引擎 | load, filter, run_plugin, run_global_rules |
router.lua |
路由器初始化 | http_init_worker |
upstream.lua |
上游管理 | set_by_route, fill_node_info, create_checker |
balancer.lua |
负载均衡 | pick_server, run, create_server_picker |
core/config_etcd.lua |
配置同步 | run_watch, sync_data, produce_res |
core/lrucache.lua |
缓存系统 | new_lru_fun(key + version → val) |
core/ctx.lua |
请求上下文 | set_vars_meta(元表惰性求值) |
ssl.lua |
证书管理 | aes_encrypt_pkey, validate |
admin/init.lua |
Admin API | run, check_token |