Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 147 additions & 148 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "ztunnel"
version = "0.0.0"
edition = "2024"
rust-version = "1.85"
rust-version = "1.90"

[features]
default = ["tls-aws-lc"]
Expand Down Expand Up @@ -61,7 +61,7 @@ hickory-client = "0.25"
hickory-proto = "0.25"
hickory-resolver = "0.25"
hickory-server = { version = "0.25", features = [ "resolver" ]}
http-body = { package = "http-body", version = "1" }
http-body = { version = "1" }
http-body-util = "0.1"
hyper = { version = "1.6", features = ["full"] }
hyper-rustls = { version = "0.27.0", default-features = false, features = ["logging", "http1", "http2"] }
Expand All @@ -71,11 +71,11 @@ itertools = "0.14"
keyed_priority_queue = "0.4"
libc = "0.2"
log = "0.4"
nix = { version = "0.29", features = ["socket", "sched", "uio", "fs", "ioctl", "user", "net", "mount", "resource" ] }
nix = { version = "0.30", features = ["socket", "sched", "uio", "fs", "ioctl", "user", "net", "mount", "resource" ] }
once_cell = "1.21"
num_cpus = "1.16"
ppp = "2.3"
prometheus-client = { version = "0.23" }
prometheus-client = { version = "0.24" }
prometheus-parse = "0.2"
prost = "0.13"
prost-types = "0.13"
Expand Down Expand Up @@ -103,7 +103,7 @@ x509-parser = { version = "0.17", default-features = false }
tracing-log = "0.2"
backoff = "0.4"
pin-project-lite = "0.2"
pingora-pool = "0.4"
pingora-pool = "0.6"
flurry = "0.5"
h2 = "0.4"
http = "1.3"
Expand Down
26 changes: 13 additions & 13 deletions fuzz/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,15 +385,15 @@ fn change_log_level(reset: bool, level: &str) -> Response<Full<Bytes>> {
if !reset && level.is_empty() {
return list_loggers();
}
if !level.is_empty() {
if let Err(_e) = validate_log_level(level) {
// Invalid level provided
return plaintext_response(
hyper::StatusCode::BAD_REQUEST,
format!("Invalid level provided: {level}\n{HELP_STRING}"),
);
};
}
if !level.is_empty()
&& let Err(_e) = validate_log_level(level)
{
// Invalid level provided
return plaintext_response(
hyper::StatusCode::BAD_REQUEST,
format!("Invalid level provided: {level}\n{HELP_STRING}"),
);
};
match telemetry::set_level(reset, level) {
Ok(_) => list_loggers(),
Err(e) => plaintext_response(
Expand Down
8 changes: 4 additions & 4 deletions src/cert_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ impl CertFetcherImpl {

impl CertFetcher for CertFetcherImpl {
fn prefetch_cert(&self, w: &Workload) {
if self.should_prefetch_certificate(w) {
if let Err(e) = self.tx.try_send(Request::Fetch(w.identity(), Warmup)) {
info!("couldn't prefetch: {:?}", e)
}
if self.should_prefetch_certificate(w)
&& let Err(e) = self.tx.try_send(Request::Fetch(w.identity(), Warmup))
{
info!("couldn't prefetch: {:?}", e)
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,10 +982,10 @@ fn construct_proxy_config(mc_path: &str, pc_env: Option<&str>) -> anyhow::Result
}

pub fn empty_to_none<A: AsRef<str>>(inp: Option<A>) -> Option<A> {
if let Some(inner) = &inp {
if inner.as_ref().is_empty() {
return None;
}
if let Some(inner) = &inp
&& inner.as_ref().is_empty()
{
return None;
}
inp
}
Expand Down
12 changes: 6 additions & 6 deletions src/dns/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,12 @@ mod tests {
.expect("expected resolve error");

// Expect NoRecordsFound with a NXDomain response code.
if let ResolveErrorKind::Proto(proto) = err.kind() {
if let ProtoErrorKind::NoRecordsFound { response_code, .. } = proto.kind() {
// Respond with the error code.
assert_eq!(&ResponseCode::NXDomain, response_code);
return;
}
if let ResolveErrorKind::Proto(proto) = err.kind()
&& let ProtoErrorKind::NoRecordsFound { response_code, .. } = proto.kind()
{
// Respond with the error code.
assert_eq!(&ResponseCode::NXDomain, response_code);
return;
}
panic!("unexpected error kind {}", err.kind())
}
Expand Down
10 changes: 5 additions & 5 deletions src/dns/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ async fn send_lookup_error<R: ResponseHandler>(
}
LookupError::ResponseCode(code) => send_error(request, response_handle, code).await,
LookupError::ResolveError(e) => {
if let ResolveErrorKind::Proto(proto) = e.kind() {
if let ProtoErrorKind::NoRecordsFound { response_code, .. } = proto.kind() {
// Respond with the error code.
return send_error(request, response_handle, *response_code).await;
}
if let ResolveErrorKind::Proto(proto) = e.kind()
&& let ProtoErrorKind::NoRecordsFound { response_code, .. } = proto.kind()
{
// Respond with the error code.
return send_error(request, response_handle, *response_code).await;
}
// TODO(nmittler): log?
send_error(request, response_handle, ResponseCode::ServFail).await
Expand Down
8 changes: 4 additions & 4 deletions src/identity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,10 +609,10 @@ impl SecretManager {
let rx = st.rx.clone();
drop(certs);

if let Some(existing_pri) = init_pri(&rx) {
if pri > existing_pri {
self.post(Request::Fetch(id.clone(), pri)).await;
}
if let Some(existing_pri) = init_pri(&rx)
&& pri > existing_pri
{
self.post(Request::Fetch(id.clone(), pri)).await;
}
Ok(rx)
}
Expand Down
6 changes: 3 additions & 3 deletions src/inpod/netns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
// limitations under the License.

use nix::sched::{CloneFlags, setns};
use std::os::fd::OwnedFd;
use std::os::unix::io::AsRawFd;
use std::os::fd::{AsFd, OwnedFd};
use std::sync::Arc;

#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
Expand Down Expand Up @@ -53,7 +52,7 @@ impl InpodNetns {
}

pub fn new(cur_netns: Arc<OwnedFd>, workload_netns: OwnedFd) -> std::io::Result<Self> {
let res = nix::sys::stat::fstat(workload_netns.as_raw_fd())
let res = nix::sys::stat::fstat(workload_netns.as_fd())
.map_err(|e| std::io::Error::from_raw_os_error(e as i32))?;
let inode = res.st_ino;
let dev = res.st_dev;
Expand Down Expand Up @@ -110,6 +109,7 @@ mod tests {
use nix::sched::unshare;
use nix::unistd::gettid;
use std::assert;
use std::os::fd::AsRawFd;
use std::os::fd::OwnedFd;
use std::process::Command;

Expand Down
12 changes: 6 additions & 6 deletions src/proxy/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,12 @@ impl ConnectionManager {
// uses a counter to determine if there are other tracked connections or not so it may retain the tx/rx channels when necessary
pub fn release(&self, c: &InboundConnection) {
let mut drains = self.drains.write().expect("mutex");
if let Some((k, mut v)) = drains.remove_entry(c) {
if v.count > 1 {
// something else is tracking this connection, decrement count but retain
v.count -= 1;
drains.insert(k, v);
}
if let Some((k, mut v)) = drains.remove_entry(c)
&& v.count > 1
{
// something else is tracking this connection, decrement count but retain
v.count -= 1;
drains.insert(k, v);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/proxy/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ pub struct CommonTrafficLabels {
#[derive(Clone, Hash, Default, Debug, PartialEq, Eq)]
struct OptionallyEncode<T>(Option<T>);
impl<T: EncodeLabelSet> EncodeLabelSet for OptionallyEncode<T> {
fn encode(&self, encoder: LabelSetEncoder) -> Result<(), std::fmt::Error> {
fn encode(&self, encoder: &mut LabelSetEncoder) -> Result<(), std::fmt::Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

due to prom client change

match &self.0 {
None => Ok(()),
Some(ll) => ll.encode(encoder),
Expand Down
9 changes: 4 additions & 5 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,17 +517,16 @@ impl OutboundConnection {
)
.await?
else {
if let Some(service) = service {
if service.
if let Some(service) = service
&& service.
load_balancer.
as_ref().
// If we are not a passthrough service, we should have an upstream
map(|lb| lb.mode != LoadBalancerMode::Passthrough).
// If the service had no lb, we should have an upstream
unwrap_or(true)
{
return Err(Error::NoHealthyUpstream(target));
}
{
return Err(Error::NoHealthyUpstream(target));
}
debug!("built request as passthrough; no upstream found");
return Ok(Request {
Expand Down
8 changes: 4 additions & 4 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,10 @@ impl ProxyState {
.services
.get_by_vip(&network_addr(network.clone(), addr.ip()))
{
if let Some(lb) = &svc.load_balancer {
if lb.mode == LoadBalancerMode::Passthrough {
return Some(UpstreamDestination::OriginalDestination);
}
if let Some(lb) = &svc.load_balancer
&& lb.mode == LoadBalancerMode::Passthrough
{
return Some(UpstreamDestination::OriginalDestination);
}
return self.find_upstream_from_service(
source_workload,
Expand Down
11 changes: 5 additions & 6 deletions src/state/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,11 @@ impl PolicyStore {
RbacScope::Global => Some(strng::EMPTY),
RbacScope::Namespace => Some(rbac.namespace),
RbacScope::WorkloadSelector => None,
} {
if let Some(pl) = self.by_namespace.get_mut(&key) {
pl.remove(&xds_name);
if pl.is_empty() {
self.by_namespace.remove(&key);
}
} && let Some(pl) = self.by_namespace.get_mut(&key)
{
pl.remove(&xds_name);
if pl.is_empty() {
self.by_namespace.remove(&key);
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/state/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,10 +825,9 @@ impl WorkloadStore {
for wip in prev.workload_ips.iter() {
if let Entry::Occupied(mut o) =
self.by_addr.entry(network_addr(prev.network.clone(), *wip))
&& o.get_mut().remove_uid(prev.uid.clone())
{
if o.get_mut().remove_uid(prev.uid.clone()) {
o.remove();
}
o.remove();
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,10 @@ where
for span in scope.from_root() {
write!(writer, ":{}", span.metadata().name())?;
let ext = span.extensions();
if let Some(fields) = &ext.get::<FormattedFields<N>>() {
if !fields.is_empty() {
write!(writer, "{{{fields}}}")?;
}
if let Some(fields) = &ext.get::<FormattedFields<N>>()
&& !fields.is_empty()
{
write!(writer, "{{{fields}}}")?;
}
}
};
Expand Down
8 changes: 4 additions & 4 deletions src/xds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ impl fmt::Display for DisplayStatus<'_> {
" (hint: check the control plane logs for more information)"
)?;
}
if !s.details().is_empty() {
if let Ok(st) = std::str::from_utf8(s.details()) {
write!(f, ", details: {st}")?;
}
if !s.details().is_empty()
&& let Ok(st) = std::str::from_utf8(s.details())
{
write!(f, ", details: {st}")?;
}
if let Some(src) = s.source().and_then(|s| s.source()) {
write!(f, ", source: {src}")?;
Expand Down
7 changes: 3 additions & 4 deletions src/xds/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,14 +666,13 @@ impl AdsClient {
if !self.types_to_expect.is_empty() {
received_type = Some(msg.type_url.clone())
}
if let XdsSignal::Ack = self.handle_stream_event(msg, &discovery_req_tx).await? {
if let Some(received_type) = received_type {
if let XdsSignal::Ack = self.handle_stream_event(msg, &discovery_req_tx).await?
&& let Some(received_type) = received_type {
self.types_to_expect.remove(&received_type);
if self.types_to_expect.is_empty() {
mem::drop(mem::take(&mut self.block_ready));
}
}
};
};
}
}
}
Expand Down