Skip to content

Commit 87cb159

Browse files
committed
use thread with Reaper::async_execute
fixes a loop problem with async also adds integration test to test out qos_host and qos_core (aka Reaper)
1 parent f304e3d commit 87cb159

File tree

11 files changed

+317
-74
lines changed

11 files changed

+317
-74
lines changed

src/init/init.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,7 @@ async fn main() {
116116
TimeVal::seconds(5),
117117
);
118118

119-
Reaper::async_execute(&handles, Box::new(Nsm), core_pool, app_pool, None)
120-
.await;
119+
Reaper::async_execute(&handles, Box::new(Nsm), core_pool, app_pool, None);
121120

122121
reboot();
123122
}

src/integration/tests/async_client.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use qos_core::{
2+
async_client::AsyncClient,
3+
async_server::{AsyncRequestProcessor, AsyncSocketServer},
4+
io::{AsyncStreamPool, SocketAddress, TimeVal, TimeValLike},
5+
server::SocketServerError,
6+
};
7+
use tokio::task::JoinHandle;
8+
9+
#[derive(Clone)]
10+
struct EchoProcessor;
11+
12+
impl AsyncRequestProcessor for EchoProcessor {
13+
async fn process(&self, request: Vec<u8>) -> Vec<u8> {
14+
request
15+
}
16+
}
17+
18+
async fn run_echo_server(
19+
socket_path: &str,
20+
) -> Result<Vec<JoinHandle<Result<(), SocketServerError>>>, SocketServerError> {
21+
let timeout = TimeVal::milliseconds(50);
22+
let pool = AsyncStreamPool::new(
23+
std::iter::once(SocketAddress::new_unix(socket_path)),
24+
timeout,
25+
);
26+
let tasks = AsyncSocketServer::listen_all(pool, &EchoProcessor)?;
27+
28+
Ok(tasks)
29+
}
30+
31+
#[tokio::test]
32+
async fn direct_connect_works() {
33+
let socket_path = "/tmp/async_client_test_direct_connect_works.sock";
34+
let sockets = std::iter::once(SocketAddress::new_unix(socket_path));
35+
let timeout = TimeVal::milliseconds(50);
36+
let pool = AsyncStreamPool::new(sockets, timeout).shared();
37+
38+
let client = AsyncClient::new(pool);
39+
40+
let server_tasks = run_echo_server(socket_path).await.unwrap();
41+
42+
let r = client.call(&[0]).await;
43+
assert!(r.is_ok());
44+
45+
for task in server_tasks {
46+
task.abort();
47+
}
48+
}
49+
50+
#[tokio::test]
51+
async fn times_out_properly() {
52+
let socket_path = "/tmp/async_client_test_times_out_properly.sock";
53+
let sockets = std::iter::once(SocketAddress::new_unix(socket_path));
54+
let timeout = TimeVal::milliseconds(50);
55+
let pool = AsyncStreamPool::new(sockets, timeout).shared();
56+
let client = AsyncClient::new(pool);
57+
58+
let r = client.call(&[0]).await;
59+
assert!(r.is_err());
60+
}
61+
62+
#[tokio::test]
63+
async fn repeat_connect_works() {
64+
let socket_path = "/tmp/async_client_test_repeat_connect_works.sock";
65+
let sockets = std::iter::once(SocketAddress::new_unix(socket_path));
66+
let timeout = TimeVal::milliseconds(50);
67+
let pool = AsyncStreamPool::new(sockets, timeout).shared();
68+
let client = AsyncClient::new(pool);
69+
70+
// server not running yet, expect a connection error
71+
let r = client.call(&[0]).await;
72+
assert!(r.is_err());
73+
74+
// start server
75+
let server_tasks = run_echo_server(socket_path).await.unwrap();
76+
77+
// server running, expect success
78+
let r = client.call(&[0]).await;
79+
assert!(r.is_ok());
80+
81+
for task in server_tasks {
82+
task.abort();
83+
}
84+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use std::{process::Command, time::Duration};
2+
3+
use integration::PIVOT_OK_PATH;
4+
use qos_test_primitives::{ChildWrapper, PathWrapper};
5+
6+
const TEST_ENCLAVE_SOCKET: &str = "/tmp/async_qos_host_test.enclave.sock";
7+
const POOL_SIZE: &str = "1";
8+
9+
#[tokio::test]
10+
async fn connects_and_gets_info() {
11+
let _qos_host: ChildWrapper =
12+
Command::new("../target/debug/async_qos_host")
13+
.arg("--usock")
14+
.arg(TEST_ENCLAVE_SOCKET)
15+
.arg("--pool-size")
16+
.arg(POOL_SIZE)
17+
.arg("--host-ip")
18+
.arg("127.0.0.1")
19+
.arg("--host-port")
20+
.arg("3323")
21+
.arg("--socket-timeout")
22+
.arg("50") // ms
23+
.spawn()
24+
.unwrap()
25+
.into();
26+
27+
tokio::time::sleep(Duration::from_millis(100)).await; // let the qos_host start
28+
29+
let r = ureq::get("http://127.0.0.1:3323/qos/enclave-info").call();
30+
assert!(r.is_err()); // expect 500 here
31+
32+
let enclave_socket = format!("{TEST_ENCLAVE_SOCKET}_0"); // manually pick the 1st one
33+
let secret_path: PathWrapper = "./async_qos_host_test.secret".into();
34+
// let eph_path = "reaper_works.eph.key";
35+
let manifest_path: PathWrapper = "async_qos_host_test..manifest".into();
36+
37+
// For our sanity, ensure the secret does not yet exist
38+
drop(std::fs::remove_file(&*secret_path));
39+
// Remove the socket file if it exists as well, in case of bad crashes
40+
drop(std::fs::remove_file(&enclave_socket));
41+
42+
let mut _enclave_child_process: ChildWrapper =
43+
Command::new("../target/debug/async_qos_core")
44+
.args([
45+
"--usock",
46+
TEST_ENCLAVE_SOCKET,
47+
"--quorum-file",
48+
&*secret_path,
49+
"--pivot-file",
50+
PIVOT_OK_PATH,
51+
"--ephemeral-file",
52+
"eph_path",
53+
"--mock",
54+
"--manifest-file",
55+
&*manifest_path,
56+
])
57+
.spawn()
58+
.unwrap()
59+
.into();
60+
61+
// Give the enclave server time to bind to the socket
62+
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
63+
64+
let r = ureq::get("http://127.0.0.1:3323/qos/enclave-info").call();
65+
assert!(r.is_ok()); // expect 200 here
66+
}

src/integration/tests/qos_host.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use std::{process::Command, time::Duration};
2+
3+
use integration::PIVOT_OK_PATH;
4+
use qos_test_primitives::{ChildWrapper, PathWrapper};
5+
6+
const TEST_ENCLAVE_SOCKET: &str = "/tmp/qos_host_test.enclave.sock";
7+
8+
#[test]
9+
fn connects_and_gets_info() {
10+
let _qos_host: ChildWrapper = Command::new("../target/debug/qos_host")
11+
.arg("--usock")
12+
.arg(TEST_ENCLAVE_SOCKET)
13+
.arg("--host-ip")
14+
.arg("127.0.0.1")
15+
.arg("--host-port")
16+
.arg("3323")
17+
.arg("--socket-timeout")
18+
.arg("50") // ms
19+
.spawn()
20+
.unwrap()
21+
.into();
22+
23+
std::thread::sleep(Duration::from_millis(100)); // let the qos_host start
24+
25+
let r = ureq::get("http://127.0.0.1:3323/qos/enclave-info").call();
26+
assert!(r.is_err()); // expect 500 here
27+
28+
let secret_path: PathWrapper = "./reaper_works.secret".into();
29+
// let eph_path = "reaper_works.eph.key";
30+
let manifest_path: PathWrapper = "reaper_works.manifest".into();
31+
32+
// For our sanity, ensure the secret does not yet exist
33+
drop(std::fs::remove_file(&*secret_path));
34+
35+
let mut _enclave_child_process: ChildWrapper =
36+
Command::new("../target/debug/qos_core")
37+
.args([
38+
"--usock",
39+
TEST_ENCLAVE_SOCKET,
40+
"--quorum-file",
41+
&*secret_path,
42+
"--pivot-file",
43+
PIVOT_OK_PATH,
44+
"--ephemeral-file",
45+
"eph_path",
46+
"--mock",
47+
"--manifest-file",
48+
&*manifest_path,
49+
])
50+
.spawn()
51+
.unwrap()
52+
.into();
53+
54+
// Give the enclave server time to bind to the socket
55+
std::thread::sleep(std::time::Duration::from_millis(200));
56+
57+
let r = ureq::get("http://127.0.0.1:3323/qos/enclave-info").call();
58+
assert!(r.is_ok()); // expect 200 here
59+
}
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use qos_core::cli::CLI;
22

3-
#[tokio::main]
4-
async fn main() {
5-
CLI::async_execute().await;
3+
fn main() {
4+
CLI::async_execute();
65
}

src/qos_core/src/cli.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ impl CLI {
203203

204204
/// Execute the enclave server CLI with the environment args using tokio/async
205205
#[cfg(feature = "async")]
206-
pub async fn async_execute() {
206+
pub fn async_execute() {
207207
let mut args: Vec<String> = env::args().collect();
208208
let opts = EnclaveOpts::new(&mut args);
209209

@@ -223,8 +223,7 @@ impl CLI {
223223
opts.async_pool(false),
224224
opts.async_pool(true),
225225
None,
226-
)
227-
.await;
226+
);
228227
}
229228
}
230229
}

src/qos_core/src/io/async_stream.rs

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ impl AsyncStream {
136136
if self.inner.is_none() {
137137
self.connect().await?;
138138
}
139-
140139
self.send(req_buf).await?;
141140
self.recv().await
142141
}
@@ -256,7 +255,7 @@ pub struct AsyncListener {
256255
impl AsyncListener {
257256
/// Bind and listen on the given address.
258257
pub(crate) fn listen(addr: &SocketAddress) -> Result<Self, IOError> {
259-
let listener = match addr {
258+
let listener = match *addr {
260259
SocketAddress::Unix(uaddr) => {
261260
let path =
262261
uaddr.path().ok_or(IOError::ConnectAddressInvalid)?;
@@ -265,8 +264,6 @@ impl AsyncListener {
265264
}
266265
#[cfg(feature = "vm")]
267266
SocketAddress::Vsock(vaddr) => {
268-
let vaddr =
269-
tokio_vsock::VsockAddr::new(vaddr.cid(), vaddr.port());
270267
let inner = InnerListener::Vsock(VsockListener::bind(vaddr)?);
271268
Self { inner }
272269
}
@@ -326,17 +323,26 @@ async fn retry_unix_connect(
326323
let socket = UnixSocket::new_stream()?;
327324

328325
eprintln!("Attempting USOCK connect to: {:?}", addr.path());
329-
match tokio::time::timeout(timeout, socket.connect(path)).await? {
330-
Ok(stream) => {
331-
eprintln!("Connected to USOCK at: {:?}", addr.path());
332-
return Ok(stream);
333-
}
334-
Err(err) => {
335-
eprintln!("Error connecting to USOCK: {err}");
336-
if SystemTime::now() > eot {
337-
return Err(err);
326+
let tr = tokio::time::timeout(timeout, socket.connect(path)).await;
327+
match tr {
328+
Ok(r) => match r {
329+
Ok(stream) => {
330+
eprintln!("Connected to USOCK at: {:?}", addr.path());
331+
return Ok(stream);
338332
}
339-
tokio::time::sleep(sleep_time).await;
333+
Err(err) => {
334+
eprintln!("Error connecting to USOCK: {err}");
335+
if SystemTime::now() > eot {
336+
return Err(err);
337+
}
338+
tokio::time::sleep(sleep_time).await;
339+
}
340+
},
341+
Err(err) => {
342+
eprintln!(
343+
"Connecting to USOCK failed with timeout error: {err}"
344+
);
345+
return Err(err.into());
340346
}
341347
}
342348
}
@@ -354,18 +360,27 @@ async fn retry_vsock_connect(
354360

355361
loop {
356362
eprintln!("Attempting VSOCK connect to: {:?}", addr);
357-
match tokio::time::timeout(timeout, VsockStream::connect(*addr)).await?
358-
{
359-
Ok(stream) => {
360-
eprintln!("Connected to VSOCK at: {:?}", addr);
361-
return Ok(stream);
362-
}
363-
Err(err) => {
364-
eprintln!("Error connecting to VSOCK: {}", err);
365-
if SystemTime::now() > eot {
366-
return Err(err);
363+
let tr =
364+
tokio::time::timeout(timeout, VsockStream::connect(*addr)).await;
365+
match tr {
366+
Ok(r) => match r {
367+
Ok(stream) => {
368+
eprintln!("Connected to VSOCK at: {:?}", addr);
369+
return Ok(stream);
370+
}
371+
Err(err) => {
372+
eprintln!("Error connecting to VSOCK: {}", err);
373+
if SystemTime::now() > eot {
374+
return Err(err);
375+
}
376+
tokio::time::sleep(sleep_time).await;
367377
}
368-
tokio::time::sleep(sleep_time).await;
378+
},
379+
Err(err) => {
380+
eprintln!(
381+
"Connecting to VSOCK failed with timeout error: {err}"
382+
);
383+
return Err(err.into());
369384
}
370385
}
371386
}

src/qos_core/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ pub const SEC_APP_SOCK: &str = "./local-enclave/sec_app.sock";
6565
/// Default socket for enclave <-> secure app communication.
6666
#[cfg(feature = "vm")]
6767
pub const SEC_APP_SOCK: &str = "/sec_app.sock";
68-
68+
/// Default socket connect timeout in milliseconds
69+
pub const DEFAULT_SOCKET_TIMEOUT: &str = "5000";
6970
/// Default socket pool size is 20
7071
#[cfg(feature = "async")]
7172
pub const DEFAULT_POOL_SIZE: &str = "1"; // DEBUG: ales - set to something real after debugging

0 commit comments

Comments
 (0)