Skip to content

Commit 0c3547e

Browse files
authored
feat: support proxying via unix socket (#160)
Signed-off-by: spacewander <[email protected]>
1 parent 3deb202 commit 0c3547e

File tree

3 files changed

+211
-3
lines changed

3 files changed

+211
-3
lines changed

api_v3.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ Method
4646
- `serializer`: string - serializer type, default `json`, also support `raw` to keep origin string value.
4747
- `extra_headers`: table - adding custom headers for etcd requests.
4848
- `sni`: string - adding custom SNI fot etcd TLS requests.
49+
- `unix_socket_proxy`: string - the unix socket path which will be used to proxy the etcd request.
4950

5051
The client method returns either a `etcd` object or an `error string`.
5152

lib/resty/etcd/v3.lua

Lines changed: 82 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ local str_byte = string.byte
1414
local str_char = string.char
1515
local ipairs = ipairs
1616
local pairs = pairs
17+
local unpack = unpack
1718
local re_match = ngx.re.match
1819
local type = type
1920
local tab_insert = table.insert
@@ -56,6 +57,60 @@ local function choose_endpoint(self)
5657
end
5758

5859

60+
local function request_uri_via_unix_socket(self, uri, params)
61+
local parsed_uri, err = self:parse_uri(uri, false)
62+
if not parsed_uri then
63+
return nil, err
64+
end
65+
66+
local path, query
67+
params.scheme, params.host, params.port, path, query = unpack(parsed_uri)
68+
if params.unix_socket_proxy then
69+
if not params.headers then
70+
params.headers = {}
71+
end
72+
73+
params.headers["Host"] = params.host
74+
params.host = params.unix_socket_proxy
75+
params.port = nil
76+
end
77+
78+
params.path = params.path or path
79+
params.query = params.query or query
80+
params.ssl_server_name = params.ssl_server_name or params.host
81+
82+
local res
83+
res, err = self:connect(params)
84+
if not res then
85+
return nil, err
86+
end
87+
88+
res, err = self:request(params)
89+
if not res then
90+
self:close()
91+
return nil, err
92+
end
93+
94+
local body
95+
body, err = res:read_body()
96+
if not body then
97+
self:close()
98+
return nil, err
99+
end
100+
101+
res.body = body
102+
103+
if params.keepalive == false then
104+
self:close()
105+
106+
else
107+
self:set_keepalive(params.keepalive_timeout, params.keepalive_pool)
108+
end
109+
110+
return res, nil
111+
end
112+
113+
59114
local function http_request_uri(self, http_cli, method, uri, body, headers, keepalive)
60115
local endpoint, err = choose_endpoint(self)
61116
if not endpoint then
@@ -70,7 +125,7 @@ local function http_request_uri(self, http_cli, method, uri, body, headers, keep
70125
end
71126

72127
local res
73-
res, err = http_cli:request_uri(full_uri, {
128+
res, err = request_uri_via_unix_socket(http_cli, full_uri, {
74129
method = method,
75130
body = body,
76131
headers = headers,
@@ -79,6 +134,7 @@ local function http_request_uri(self, http_cli, method, uri, body, headers, keep
79134
ssl_cert_path = self.ssl_cert_path,
80135
ssl_key_path = self.ssl_key_path,
81136
ssl_server_name = self.sni,
137+
unix_socket_proxy = self.unix_socket_proxy,
82138
})
83139

84140
if err then
@@ -199,6 +255,7 @@ function _M.new(opts)
199255
local serializer = opts.serializer
200256
local extra_headers = opts.extra_headers
201257
local sni = opts.sni
258+
local unix_socket_proxy = opts.unix_socket_proxy
202259

203260
if not typeof.uint(timeout) then
204261
return nil, 'opts.timeout must be unsigned integer'
@@ -228,6 +285,10 @@ function _M.new(opts)
228285
return nil, 'opts.password must be string or ignore'
229286
end
230287

288+
if unix_socket_proxy and not typeof.string(unix_socket_proxy) then
289+
return nil, 'opts.unix_socket_proxy must be string or ignore'
290+
end
291+
231292
local endpoints = {}
232293
local http_hosts
233294
if type(http_host) == 'string' then -- signle node
@@ -243,12 +304,25 @@ function _M.new(opts)
243304
return nil, "invalid http host: " .. host .. ", err: " .. (err or "not matched")
244305
end
245306

307+
local addr
308+
if unix_socket_proxy then
309+
addr = unix_socket_proxy
310+
else
311+
addr = m[2] or "127.0.0.1"
312+
end
313+
314+
local port
315+
if not unix_socket_proxy then
316+
port = m[3] or "2379"
317+
end
318+
246319
tab_insert(endpoints, {
247320
full_prefix = host .. utils.normalize(api_prefix),
248321
http_host = host,
249322
scheme = m[1],
250323
host = m[2] or "127.0.0.1",
251-
port = m[3] or "2379",
324+
address = addr,
325+
port = port,
252326
api_prefix = api_prefix,
253327
})
254328
end
@@ -282,6 +356,7 @@ function _M.new(opts)
282356
ssl_key_path = opts.ssl_key_path,
283357
extra_headers = extra_headers,
284358
sni = sni,
359+
unix_socket_proxy = unix_socket_proxy,
285360
},
286361
mt)
287362
end
@@ -538,7 +613,7 @@ local function http_request_chunk(self, http_cli)
538613
local ok
539614
ok, err = http_cli:connect({
540615
scheme = endpoint.scheme,
541-
host = endpoint.host,
616+
host = endpoint.address,
542617
port = endpoint.port,
543618
ssl_verify = self.ssl_verify,
544619
ssl_cert_path = self.ssl_cert_path,
@@ -625,6 +700,10 @@ local function request_chunk(self, method, path, opts, timeout)
625700
return nil, err
626701
end
627702

703+
if self.unix_socket_proxy then
704+
headers["Host"] = endpoint.host
705+
end
706+
628707
local res
629708
res, err = http_cli:request({
630709
method = method,

t/v3/unix_socket.t

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
use Test::Nginx::Socket::Lua;
2+
3+
log_level('info');
4+
no_long_string();
5+
repeat_each(1);
6+
7+
my $test_dir = html_dir();
8+
$ENV{TEST_NGINX_HTML_DIR} ||= $test_dir;
9+
10+
my $etcd_version = `etcd --version`;
11+
if ($etcd_version =~ /^etcd Version: 2/ || $etcd_version =~ /^etcd Version: 3.1./) {
12+
plan(skip_all => "etcd is too old, skip v3 protocol");
13+
} else {
14+
plan 'no_plan';
15+
}
16+
17+
our $HttpConfig = <<"_EOC_";
18+
lua_socket_log_errors off;
19+
lua_package_path 'lib/?.lua;/usr/local/share/lua/5.1/?.lua;;';
20+
init_by_lua_block {
21+
local cjson = require("cjson.safe")
22+
23+
function check_res(data, err, val, status)
24+
if err then
25+
ngx.say("err: ", err)
26+
ngx.exit(200)
27+
end
28+
29+
if val then
30+
if data and data.body.kvs==nil then
31+
ngx.exit(404)
32+
end
33+
if data and data.body.kvs and val ~= data.body.kvs[1].value then
34+
ngx.say("failed to check value")
35+
ngx.log(ngx.ERR, "failed to check value, got: ", data.body.kvs[1].value,
36+
", expect: ", val)
37+
ngx.exit(200)
38+
else
39+
ngx.say("checked val as expect: ", val)
40+
end
41+
end
42+
43+
if status and status ~= data.status then
44+
ngx.exit(data.status)
45+
end
46+
end
47+
}
48+
49+
server {
50+
listen unix:$test_dir/lua-resty-etcd.sock;
51+
location / {
52+
access_by_lua_block {
53+
ngx.log(ngx.WARN, "hit with host ", ngx.var.http_host)
54+
}
55+
proxy_pass http://127.0.0.1:2379;
56+
proxy_http_version 1.1;
57+
proxy_set_header Connection "";
58+
proxy_set_header Host \$http_host;
59+
}
60+
}
61+
_EOC_
62+
63+
run_tests();
64+
65+
__DATA__
66+
67+
=== TEST 1: request over unix socket
68+
--- http_config eval: $::HttpConfig
69+
--- config
70+
location /t {
71+
content_by_lua_block {
72+
local etcd, err = require("resty.etcd").new({protocol = "v3", unix_socket_proxy = "unix:$TEST_NGINX_HTML_DIR/lua-resty-etcd.sock"})
73+
check_res(etcd, err)
74+
75+
local res, err = etcd:set("/test", "abc")
76+
check_res(res, err)
77+
78+
ngx.timer.at(0.1, function ()
79+
etcd:set("/test", "bcd3")
80+
end)
81+
82+
ngx.timer.at(0.2, function ()
83+
etcd:set("/test", "bcd4")
84+
end)
85+
86+
local cur_time = ngx.now()
87+
local body_chunk_fun, err, http_cli = etcd:watch("/test", {timeout = 0.5, need_cancel = true})
88+
89+
if type(http_cli) ~= "table" then
90+
ngx.say("need_cancel failed")
91+
end
92+
93+
if not body_chunk_fun then
94+
ngx.say("failed to watch: ", err)
95+
end
96+
97+
local chunk, err = body_chunk_fun()
98+
ngx.say("created: ", chunk.result.created)
99+
local chunk, err = body_chunk_fun()
100+
ngx.say("value: ", chunk.result.events[1].kv.value)
101+
102+
local res, err = etcd:watchcancel(http_cli)
103+
if not res then
104+
ngx.say("failed to cancel: ", err)
105+
end
106+
107+
local chunk, err = body_chunk_fun()
108+
ngx.say(err)
109+
110+
ngx.say("ok")
111+
}
112+
}
113+
--- request
114+
GET /t
115+
--- no_error_log
116+
[error]
117+
--- grep_error_log eval
118+
qr/hit with host 127.0.0.1/
119+
--- grep_error_log_out
120+
hit with host 127.0.0.1
121+
hit with host 127.0.0.1
122+
hit with host 127.0.0.1
123+
--- response_body
124+
created: true
125+
value: bcd3
126+
closed
127+
ok
128+
--- timeout: 5

0 commit comments

Comments
 (0)