Skip to content

feat(upstream): filter nodes in upstream with metadata #12448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions apisix/discovery/consul/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,12 @@ function _M.connect(premature, consul_server, retry_delay)
local nodes = up_services[service_name]
local nodes_uniq = {}
for _, node in ipairs(result.body) do
if not node.Service then
local service = node.Service
if not service then
goto CONTINUE
end

local svc_address, svc_port = node.Service.Address, node.Service.Port
local svc_address, svc_port, metadata = service.Address, service.Port, service.Meta
-- Handle nil or 0 port case - default to 80 for HTTP services
if not svc_port or svc_port == 0 then
svc_port = 80
Expand All @@ -532,7 +533,8 @@ function _M.connect(premature, consul_server, retry_delay)
core.table.insert(nodes, {
host = svc_address,
port = tonumber(svc_port),
weight = default_weight,
weight = metadata and metadata.weight or default_weight,
metadata = metadata
})
nodes_uniq[service_id] = true
end
Expand Down
37 changes: 37 additions & 0 deletions apisix/upstream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,35 @@ local function fill_node_info(up_conf, scheme, is_stream)
return true
end

local function match_metadata_filters(inst, filters)
local metadata = inst.metadata or {}
for _, f in ipairs(filters) do
local key = f.key
local allowed_vals = f.allowed_vals
local val = metadata[key]
local matched = false
for _, allowed in ipairs(allowed_vals) do
if val == allowed then
matched = true
break
end
end
if not matched then
return false
end
end
return true
end

local function match_nodes_by_metadata(nodes, filters)
local result = {}
for _, node in ipairs(nodes) do
if match_metadata_filters(node, filters) then
core.table.insert(result, node)
end
end
return result
end

function _M.set_by_route(route, api_ctx)
if api_ctx.upstream_conf then
Expand Down Expand Up @@ -310,6 +339,14 @@ function _M.set_by_route(route, api_ctx)
return HTTP_CODE_UPSTREAM_UNAVAILABLE, "no valid upstream node: " .. (err or "nil")
end

if up_conf.metadata_match then
Copy link
Preview

Copilot AI Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metadata_match field is used directly without validation of its structure. The code expects it to be an array of objects with 'key' and 'allowed_vals' fields, but the test shows it as a simple key-value object. This mismatch will cause the filtering to fail.

Suggested change
if up_conf.metadata_match then
if up_conf.metadata_match then
-- Validate and transform metadata_match into the expected format
if type(up_conf.metadata_match) == "table" and not (#up_conf.metadata_match > 0) then
local transformed_metadata = {}
for key, allowed_vals in pairs(up_conf.metadata_match) do
if type(allowed_vals) ~= "table" then
return HTTP_CODE_UPSTREAM_UNAVAILABLE, "invalid metadata_match format: allowed_vals must be a table"
end
table.insert(transformed_metadata, { key = key, allowed_vals = allowed_vals })
end
up_conf.metadata_match = transformed_metadata
elseif type(up_conf.metadata_match) ~= "table" or not (#up_conf.metadata_match > 0) then
return HTTP_CODE_UPSTREAM_UNAVAILABLE, "invalid metadata_match format: must be an array of objects with 'key' and 'allowed_vals'"
end

Copilot uses AI. Check for mistakes.

new_nodes = match_nodes_by_metadata(new_nodes, up_conf.metadata_match)
if #new_nodes == 0 then
return HTTP_CODE_UPSTREAM_UNAVAILABLE, "no matched node filterd by metadata_match"
end
core.log.info("filterd by metadata_match: ", core.json.delay_encode(new_nodes, true))
end

local same = upstream_util.compare_upstream_node(up_conf, new_nodes)
if not same then
if not up_conf._nodes_ver then
Expand Down
58 changes: 58 additions & 0 deletions t/discovery/consul.t
Original file line number Diff line number Diff line change
Expand Up @@ -781,3 +781,61 @@ location /sleep {
qr//
]
--- ignore_error_log

=== TEST 16: test metadata_match with consul discovery
--- yaml_config
apisix:
node_listen: 1984
config_center: yaml
discovery:
consul:
servers:
- http://127.0.0.1:8500
routes:
-
uri: /test_metadata_match
upstream:
service_name: mock-service
type: roundrobin
discovery_type: consul
metadata_match:
version:
- "v2"

--- config
location /v1/agent {
proxy_pass http://127.0.0.1:8500;
}
location /test_metadata_match {
content_by_lua_block {
local server = ngx.var.server_addr or "unknown"
ngx.say("Upstream server handling request: ", server)
}
}
--- timeout: 5
--- pipelined_requests eval
[
"PUT /v1/agent/service/register\n" . "{\"ID\":\"mock-node-v1\",\"Name\":\"mock-service\",\"Address\":\"127.0.0.1\",\"Port\":1984,\"Meta\":{\"version\":\"v1\",\"weight\":\"1\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
"PUT /v1/agent/service/register\n" . "{\"ID\":\"mock-node-v2\",\"Name\":\"mock-service\",\"Address\":\"127.0.0.2\",\"Port\":1984,\"Meta\":{\"version\":\"v2\",\"weight\":\"2\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",

"GET /test_metadata_match?run1",
"GET /test_metadata_match?run2",
"GET /test_metadata_match?run3",

"PUT /v1/agent/service/deregister/mock-node-v1",
"PUT /v1/agent/service/deregister/mock-node-v2"
]
--- response_body_like eval
[
qr//,
qr//,

qr/127.0.0.2/,
qr/127.0.0.2/,
qr/127.0.0.2/,

qr//,
qr//
]
--- no_error_log
[error]
61 changes: 61 additions & 0 deletions t/node/upstream-discovery.t
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,64 @@ qr/compare upstream nodes by value|fill node info for upstream/
fill node info for upstream
compare upstream nodes by value
fill node info for upstream

=== TEST 11: test metadata_match for upstream nodes selection
--- log_level: debug
--- apisix_yaml
routes:
-
uri: /hello
upstream:
type: roundrobin
nodes:
"127.0.0.1:1980":
weight: 1
metadata:
version: v1
"127.0.0.2:1980":
weight: 1
metadata:
version: v2
metadata_match:
version: v2
--- config
location /t {
content_by_lua_block {
local discovery = require("apisix.discovery.init").discovery
-- mock nodes with metadata
discovery.mock = {
nodes = function()
return {
{host = "127.0.0.1", port = 1980, weight = 1, metadata = {version = "v1"}},
{host = "127.0.0.2", port = 1980, weight = 1, metadata = {version = "v2"}},
}
end
}

local http = require "resty.http"
local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
local httpc = http.new()

-- do 5 requests and expect upstream node with metadata.version == v2 (127.0.0.2)
for i = 1, 5 do
local res = httpc:request_uri(uri, {method = "GET", keepalive = false})
if res.status ~= 200 then
ngx.say("request failed: ", res.status)
return
end
if not string.find(res.body, "127.0.0.2") then
ngx.say("unexpected upstream node: ", res.body)
return
end
end
ngx.say("pass")
}
}
--- response_body
pass
--- grep_error_log eval
qr/compare upstream nodes by value|fill node info for upstream|metadata_match/
--- grep_error_log_out
fill node info for upstream
compare upstream nodes by value
metadata_match found for upstream