From 0ef29ab0614014ba2c4181c50e1363b6918decc2 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 27 Jun 2025 17:11:52 +0200 Subject: [PATCH 01/31] WIP --- Cargo.lock | 1 + crates/stackable-certs/src/ca/consts.rs | 2 +- crates/stackable-certs/src/lib.rs | 4 + crates/stackable-operator/src/cli.rs | 67 +++- .../src/instrumentation/axum/mod.rs | 16 - crates/stackable-webhook/Cargo.toml | 1 + crates/stackable-webhook/src/lib.rs | 121 +++---- .../src/servers/conversion.rs | 308 +++++++++++------- crates/stackable-webhook/src/servers/mod.rs | 3 +- crates/stackable-webhook/src/tls.rs | 120 ++++--- 10 files changed, 407 insertions(+), 236 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c89c0e3e1..fbcd856fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3129,6 +3129,7 @@ dependencies = [ "tower-http", "tracing", "tracing-opentelemetry", + "x509-cert", ] [[package]] diff --git a/crates/stackable-certs/src/ca/consts.rs b/crates/stackable-certs/src/ca/consts.rs index 125a63a05..bcd080cd4 100644 --- a/crates/stackable-certs/src/ca/consts.rs +++ b/crates/stackable-certs/src/ca/consts.rs @@ -1,6 +1,6 @@ use stackable_operator::time::Duration; -/// The default CA validity time span of one hour (3600 seconds). +/// The default CA validity time span pub const DEFAULT_CA_VALIDITY: Duration = Duration::from_hours_unchecked(1); /// The root CA subject name containing only the common name. diff --git a/crates/stackable-certs/src/lib.rs b/crates/stackable-certs/src/lib.rs index 5b9c87327..122b1dd53 100644 --- a/crates/stackable-certs/src/lib.rs +++ b/crates/stackable-certs/src/lib.rs @@ -22,6 +22,7 @@ use std::ops::Deref; use snafu::Snafu; +use stackable_operator::time::Duration; use x509_cert::{Certificate, spki::EncodePublicKey}; #[cfg(feature = "rustls")] use { @@ -36,6 +37,9 @@ use crate::keys::CertificateKeypair; pub mod ca; pub mod keys; +/// The default certificate validity time span +pub const DEFAULT_CERTIFICATE_VALIDITY: Duration = Duration::from_hours_unchecked(1); + /// Error variants which can be encountered when creating a new /// [`CertificatePair`]. #[derive(Debug, Snafu)] diff --git a/crates/stackable-operator/src/cli.rs b/crates/stackable-operator/src/cli.rs index 50f36421e..c0086f04d 100644 --- a/crates/stackable-operator/src/cli.rs +++ b/crates/stackable-operator/src/cli.rs @@ -163,7 +163,7 @@ pub enum Command { /// Can be embedded into an extended argument set: /// /// ```rust -/// # use stackable_operator::cli::{Command, ProductOperatorRun, ProductConfigPath}; +/// # use stackable_operator::cli::{Command, OperatorEnvironmentOpts, ProductOperatorRun, ProductConfigPath}; /// use clap::Parser; /// use stackable_operator::namespace::WatchNamespace; /// use stackable_telemetry::tracing::TelemetryOptions; @@ -176,7 +176,20 @@ pub enum Command { /// common: ProductOperatorRun, /// } /// -/// let opts = Command::::parse_from(["foobar-operator", "run", "--name", "foo", "--product-config", "bar", "--watch-namespace", "foobar"]); +/// let opts = Command::::parse_from([ +/// "foobar-operator", +/// "run", +/// "--name", +/// "foo", +/// "--product-config", +/// "bar", +/// "--watch-namespace", +/// "foobar", +/// "--operator-namespace", +/// "stackable-operators", +/// "--operator-service-name", +/// "foo-operator", +/// ]); /// assert_eq!(opts, Command::Run(Run { /// name: "foo".to_string(), /// common: ProductOperatorRun { @@ -184,6 +197,10 @@ pub enum Command { /// watch_namespace: WatchNamespace::One("foobar".to_string()), /// telemetry_arguments: TelemetryOptions::default(), /// cluster_info_opts: Default::default(), +/// operator_environment: OperatorEnvironmentOpts { +/// operator_namespace: "stackable-operators".to_string(), +/// operator_service_name: "foo-operator".to_string(), +/// }, /// }, /// })); /// ``` @@ -216,6 +233,9 @@ pub struct ProductOperatorRun { #[arg(long, env, default_value = "")] pub watch_namespace: WatchNamespace, + #[command(flatten)] + pub operator_environment: OperatorEnvironmentOpts, + #[command(flatten)] pub telemetry_arguments: TelemetryOptions, @@ -278,6 +298,18 @@ impl ProductConfigPath { } } +#[derive(clap::Parser, Debug, PartialEq, Eq)] +pub struct OperatorEnvironmentOpts { + /// The namespace the operator is running in, usually `stackable-operators`. + #[arg(long, env)] + pub operator_namespace: String, + + /// The name of the service the operator is reachable at, usually + /// something like `-operator`. + #[arg(long, env)] + pub operator_service_name: String, +} + #[cfg(test)] mod tests { use std::{env, fs::File}; @@ -292,6 +324,8 @@ mod tests { const DEPLOY_FILE_PATH: &str = "deploy_config_spec_properties.yaml"; const DEFAULT_FILE_PATH: &str = "default_file_path_properties.yaml"; const WATCH_NAMESPACE: &str = "WATCH_NAMESPACE"; + const OPERATOR_NAMESPACE: &str = "OPERATOR_NAMESPACE"; + const OPERATOR_SERVICE_NAME: &str = "OPERATOR_SERVICE_NAME"; #[test] fn verify_cli() { @@ -388,6 +422,10 @@ mod tests { "bar", "--watch-namespace", "foo", + "--operator-namespace", + "stackable-operators", + "--operator-service-name", + "foo-operator", ]); assert_eq!( opts, @@ -396,11 +434,23 @@ mod tests { watch_namespace: WatchNamespace::One("foo".to_string()), cluster_info_opts: Default::default(), telemetry_arguments: Default::default(), + operator_environment: OperatorEnvironmentOpts { + operator_namespace: "stackable-operators".to_string(), + operator_service_name: "foo-operator".to_string(), + } } ); // no cli / no env - let opts = ProductOperatorRun::parse_from(["run", "--product-config", "bar"]); + let opts = ProductOperatorRun::parse_from([ + "run", + "--product-config", + "bar", + "--operator-namespace", + "stackable-operators", + "--operator-service-name", + "foo-operator", + ]); assert_eq!( opts, ProductOperatorRun { @@ -408,11 +458,18 @@ mod tests { watch_namespace: WatchNamespace::All, cluster_info_opts: Default::default(), telemetry_arguments: Default::default(), + operator_environment: OperatorEnvironmentOpts { + operator_namespace: "stackable-operators".to_string(), + operator_service_name: "foo-operator".to_string(), + } } ); // env with namespace unsafe { env::set_var(WATCH_NAMESPACE, "foo") }; + unsafe { env::set_var(OPERATOR_SERVICE_NAME, "foo-operator") }; + unsafe { env::set_var(OPERATOR_NAMESPACE, "stackable-operators") }; + let opts = ProductOperatorRun::parse_from(["run", "--product-config", "bar"]); assert_eq!( opts, @@ -421,6 +478,10 @@ mod tests { watch_namespace: WatchNamespace::One("foo".to_string()), cluster_info_opts: Default::default(), telemetry_arguments: Default::default(), + operator_environment: OperatorEnvironmentOpts { + operator_namespace: "stackable-operators".to_string(), + operator_service_name: "foo-operator".to_string(), + } } ); } diff --git a/crates/stackable-telemetry/src/instrumentation/axum/mod.rs b/crates/stackable-telemetry/src/instrumentation/axum/mod.rs index b72450c99..14e7928cd 100644 --- a/crates/stackable-telemetry/src/instrumentation/axum/mod.rs +++ b/crates/stackable-telemetry/src/instrumentation/axum/mod.rs @@ -73,22 +73,6 @@ const OTEL_TRACE_ID_TO: &str = "opentelemetry.trace_id.to"; /// # let _: Router = router; /// ``` /// -/// ### Example with Webhook -/// -/// The usage is even simpler when combined with the `stackable_webhook` crate. -/// The webhook server has built-in support to automatically emit HTTP spans on -/// every incoming request. -/// -/// ``` -/// use stackable_webhook::{WebhookServer, Options}; -/// use axum::Router; -/// -/// let router = Router::new(); -/// let server = WebhookServer::new(router, Options::default()); -/// -/// # let _: WebhookServer = server; -/// ``` -/// /// This layer is implemented based on [this][1] official Tower guide. /// /// [1]: https://github.com/tower-rs/tower/blob/master/guides/building-a-middleware-from-scratch.md diff --git a/crates/stackable-webhook/Cargo.toml b/crates/stackable-webhook/Cargo.toml index da553f188..abf5aac00 100644 --- a/crates/stackable-webhook/Cargo.toml +++ b/crates/stackable-webhook/Cargo.toml @@ -26,3 +26,4 @@ tower-http.workspace = true tower.workspace = true tracing.workspace = true tracing-opentelemetry.workspace = true +x509-cert.workspace = true diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index e1bb001a9..a1f2e2786 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -1,6 +1,6 @@ //! Utility types and functions to easily create ready-to-use webhook servers //! which can handle different tasks, for example CRD conversions. All webhook -//! servers use HTTPS by defaultThis library is fully compatible with the +//! servers use HTTPS by default. This library is fully compatible with the //! [`tracing`] crate and emits debug level tracing data. //! //! Most users will only use the top-level exported generic [`WebhookServer`] @@ -10,25 +10,30 @@ //! ``` //! use stackable_webhook::{WebhookServer, Options}; //! use axum::Router; +//! use tokio::sync::mpsc; //! +//! let (cert_tx, _cert_rx) = mpsc::channel(1); //! let router = Router::new(); -//! let server = WebhookServer::new(router, Options::default()); +//! let server = WebhookServer::new(router, Options::default(), vec![], cert_tx); //! ``` //! //! For some usages, complete end-to-end [`WebhookServer`] implementations -//! exist. One such implementation is the [`ConversionWebhookServer`][1]. The -//! only required parameters are a conversion handler function and [`Options`]. +//! exist. One such implementation is the [`ConversionWebhookServer`][1]. //! //! This library additionally also exposes lower-level structs and functions to -//! enable complete controll over these details if needed. +//! enable complete control over these details if needed. //! //! [1]: crate::servers::ConversionWebhookServer use axum::{Router, routing::get}; use futures_util::{FutureExt as _, pin_mut, select}; use snafu::{ResultExt, Snafu}; use stackable_telemetry::AxumTraceLayer; -use tokio::signal::unix::{SignalKind, signal}; +use tokio::{ + signal::unix::{SignalKind, signal}, + sync::mpsc, +}; use tower::ServiceBuilder; +use x509_cert::Certificate; // use tower_http::trace::TraceLayer; use crate::tls::TlsServer; @@ -41,10 +46,6 @@ pub mod tls; // Selected re-exports pub use crate::options::Options; -/// A result type alias with the library-level [`Error`] type as teh default -/// error type. -pub type Result = std::result::Result; - /// A generic webhook handler receiving a request and sending back a response. /// /// This trait is not intended to be implemented by external crates and this @@ -52,24 +53,12 @@ pub type Result = std::result::Result; /// implementation is part of the [`ConversionWebhookServer`][1]. /// /// [1]: crate::servers::ConversionWebhookServer -pub(crate) trait WebhookHandler { +pub trait WebhookHandler { fn call(self, req: Req) -> Res; } -/// A generic webhook handler receiving a request and state and sending back -/// a response. -/// -/// This trait is not intended to be implemented by external crates and this -/// library provides various ready-to-use implementations for it. One such an -/// implementation is part of the [`ConversionWebhookServer`][1]. -/// -/// [1]: crate::servers::ConversionWebhookServer -pub(crate) trait StatefulWebhookHandler { - fn call(self, req: Req, state: S) -> Res; -} - #[derive(Debug, Snafu)] -pub enum Error { +pub enum WebhookError { #[snafu(display("failed to create TLS server"))] CreateTlsServer { source: tls::Error }, @@ -88,8 +77,7 @@ pub enum Error { /// /// [1]: crate::servers::ConversionWebhookServer pub struct WebhookServer { - options: Options, - router: Router, + tls_server: TlsServer, } impl WebhookServer { @@ -108,9 +96,11 @@ impl WebhookServer { /// ``` /// use stackable_webhook::{WebhookServer, Options}; /// use axum::Router; + /// use tokio::sync::mpsc; /// + /// let (cert_tx, _cert_rx) = mpsc::channel(1); /// let router = Router::new(); - /// let server = WebhookServer::new(router, Options::default()); + /// let server = WebhookServer::new(router, Options::default(), vec![], cert_tx); /// ``` /// /// ### Example with Custom Options @@ -118,23 +108,61 @@ impl WebhookServer { /// ``` /// use stackable_webhook::{WebhookServer, Options}; /// use axum::Router; + /// use tokio::sync::mpsc; /// + /// let (cert_tx, _cert_rx) = mpsc::channel(1); /// let options = Options::builder() /// .bind_address([127, 0, 0, 1], 8080) /// .build(); + /// let sans = vec!["my-san-entry".to_string()]; /// /// let router = Router::new(); - /// let server = WebhookServer::new(router, options); + /// let server = WebhookServer::new(router, options, sans, cert_tx); /// ``` - pub fn new(router: Router, options: Options) -> Self { + pub async fn new( + router: Router, + options: Options, + subject_alterative_dns_names: Vec, + cert_tx: mpsc::Sender, + ) -> Result { tracing::trace!("create new webhook server"); - Self { options, router } + + // TODO (@Techassi): Make opt-in configurable from the outside + // Create an OpenTelemetry tracing layer + tracing::trace!("create tracing service (layer)"); + let trace_layer = AxumTraceLayer::new().with_opt_in(); + + // Use a service builder to provide multiple layers at once. Recommended + // by the Axum project. + // + // See https://docs.rs/axum/latest/axum/middleware/index.html#applying-multiple-middleware + // TODO (@NickLarsenNZ): rename this server_builder and keep it specific to tracing, since it's placement in the chain is important + let service_builder = ServiceBuilder::new().layer(trace_layer); + + // Create the root router and merge the provided router into it. + tracing::debug!("create core router and merge provided router"); + let router = router + .layer(service_builder) + // The health route is below the AxumTraceLayer so as not to be instrumented + .route("/health", get(|| async { "ok" })); + + tracing::debug!("create TLS server"); + let tls_server = TlsServer::new( + options.socket_addr, + router, + subject_alterative_dns_names, + cert_tx, + ) + .await + .context(CreateTlsServerSnafu)?; + + Ok(Self { tls_server }) } /// Runs the Webhook server and sets up signal handlers for shutting down. /// /// This does not implement graceful shutdown of the underlying server. - pub async fn run(self) -> Result<()> { + pub async fn run(self) -> Result<(), WebhookError> { let future_server = self.run_server(); let future_signal = async { let mut sigint = signal(SignalKind::interrupt()).expect("create SIGINT listener"); @@ -167,36 +195,9 @@ impl WebhookServer { /// Runs the webhook server by creating a TCP listener and binding it to /// the specified socket address. - async fn run_server(self) -> Result<()> { + async fn run_server(self) -> Result<(), WebhookError> { tracing::debug!("run webhook server"); - // TODO (@Techassi): Make opt-in configurable from the outside - // Create an OpenTelemetry tracing layer - tracing::trace!("create tracing service (layer)"); - let trace_layer = AxumTraceLayer::new().with_opt_in(); - - // Use a service builder to provide multiple layers at once. Recommended - // by the Axum project. - // - // See https://docs.rs/axum/latest/axum/middleware/index.html#applying-multiple-middleware - // TODO (@NickLarsenNZ): rename this server_builder and keep it specific to tracing, since it's placement in the chain is important - let service_builder = ServiceBuilder::new().layer(trace_layer); - - // Create the root router and merge the provided router into it. - tracing::debug!("create core router and merge provided router"); - let router = self - .router - .layer(service_builder) - // The health route is below the AxumTraceLayer so as not to be instrumented - .route("/health", get(|| async { "ok" })); - - // Create server for TLS termination - tracing::debug!("create TLS server"); - let tls_server = TlsServer::new(self.options.socket_addr, router) - .await - .context(CreateTlsServerSnafu)?; - - tracing::info!("running TLS server"); - tls_server.run().await.context(RunTlsServerSnafu) + self.tls_server.run().await.context(RunTlsServerSnafu) } } diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index 922a9b431..7877fd4a8 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -1,14 +1,55 @@ -use std::fmt::Debug; +use std::{collections::HashMap, fmt::Debug}; -use axum::{Json, Router, extract::State, routing::post}; +use axum::{Json, Router, routing::post}; +use k8s_openapi::{ + ByteString, + apiextensions_apiserver::pkg::apis::apiextensions::v1::{ + CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig, + WebhookConversion, + }, +}; // Re-export this type because users of the conversion webhook server require // this type to write the handler function. Instead of importing this type from // kube directly, consumers can use this type instead. This also eliminates // keeping the kube dependency version in sync between here and the operator. pub use kube::core::conversion::ConversionReview; +use kube::{ + Api, Client, ResourceExt, + api::{Patch, PatchParams}, +}; +use snafu::{OptionExt, ResultExt, Snafu}; +use stackable_operator::cli::OperatorEnvironmentOpts; +use tokio::sync::mpsc; use tracing::instrument; +use x509_cert::{ + Certificate, + der::{EncodePem, pem::LineEnding}, +}; -use crate::{StatefulWebhookHandler, WebhookHandler, WebhookServer, options::Options}; +use crate::{ + WebhookError, WebhookHandler, WebhookServer, constants::DEFAULT_HTTPS_PORT, options::Options, +}; + +#[derive(Debug, Snafu)] +pub enum ConversionWebhookError { + #[snafu(display("failed to create webhook server"))] + CreateWebhookServer { source: WebhookError }, + + #[snafu(display("failed to run webhook server"))] + RunWebhookServer { source: WebhookError }, + + #[snafu(display("failed to receive certificate from channel"))] + ReceiverCertificateFromChannel, + + #[snafu(display("failed to convert CA certificate into PEM format"))] + ConvertCaToPem { source: x509_cert::der::Error }, + + #[snafu(display("failed to update CRD {crd_name:?}"))] + UpdateCRD { + source: stackable_operator::kube::Error, + crd_name: String, + }, +} impl WebhookHandler for F where @@ -19,141 +60,190 @@ where } } -impl StatefulWebhookHandler for F -where - F: FnOnce(ConversionReview, S) -> ConversionReview, -{ - fn call(self, req: ConversionReview, state: S) -> ConversionReview { - self(req, state) - } -} - /// A ready-to-use CRD conversion webhook server. /// -/// See [`ConversionWebhookServer::new()`] and [`ConversionWebhookServer::new_with_state()`] -/// for usage examples. +/// See [`ConversionWebhookServer::new()`] for usage examples. pub struct ConversionWebhookServer { - options: Options, - router: Router, + server: WebhookServer, + current_cert: Certificate, + + client: Client, + field_manager: String, + crds: HashMap, + operator_environment: OperatorEnvironmentOpts, } impl ConversionWebhookServer { - /// Creates a new conversion webhook server **without** state which expects - /// POST requests being made to the `/convert` endpoint. + /// Creates a new conversion webhook server, which expects POST requests being made to the + /// `/convert/{crd name}` endpoint. + /// + /// You need to provide two things for every CRD passed in via the `crds_and_handlers` argument: + /// + /// 1. The CRD + /// 2. A conversion function to convert between CRD versions. Typically you would use the + /// the auto-generated `try_convert` function on CRD spec definition structs for this. /// - /// Each request is handled by the provided `handler` function. Any function - /// with the signature `(ConversionReview) -> ConversionReview` can be - /// provided. The [`ConversionReview`] type can be imported via a re-export at - /// [`crate::servers::ConversionReview`]. + /// The [`ConversionWebhookServer`] takes care of reconciling the CRDs into the Kubernetes + /// cluster and takes care of adding itself as conversion webhook. This includes TLS + /// certificates and CA bundles. /// /// # Example /// - /// ``` + /// ```no_run /// use stackable_webhook::{ /// servers::{ConversionReview, ConversionWebhookServer}, /// Options /// }; + /// use stackable_operator::cli::OperatorEnvironmentOpts; + /// use stackable_operator::kube::Client; + /// use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion}; + /// + /// # async fn test() { + /// let crds_and_handlers = [ + /// ( + /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1).unwrap(), + /// S3Connection::try_convert as fn(ConversionReview) -> ConversionReview, + /// ), + /// ]; + /// + /// const OPERATOR_NAME: &str = "PRODUCT_OPERATOR"; + /// let client = Client::try_default().await.expect("failed to create Kubernetes client"); + /// // Normally you would get this from the CLI arguments in `ProductOperatorRun::operator_environment` + /// let operator_environment = OperatorEnvironmentOpts { + /// operator_namespace: "stackable-operator".to_string(), + /// operator_service_name: "product-operator".to_string(), + /// }; /// /// // Construct the conversion webhook server - /// let server = ConversionWebhookServer::new(handler, Options::default()); + /// let conversion_webhook = ConversionWebhookServer::new( + /// crds_and_handlers, + /// stackable_webhook::Options::default(), + /// client, + /// OPERATOR_NAME, + /// operator_environment, + /// ) + /// .await + /// .expect("failed to create ConversionWebhookServer"); /// - /// // Define the handler function - /// fn handler(req: ConversionReview) -> ConversionReview { - /// // In here we can do the CRD conversion - /// req - /// } + /// // Bootstrap CRDs first to avoid "too old resource version" error + /// conversion_webhook.reconcile_crds().await.expect("failed to reconcile CRDs"); + /// # } /// ``` - #[instrument(name = "create_conversion_webhhok_server", skip(handler))] - pub fn new(handler: H, options: Options) -> Self + #[instrument( + name = "create_conversion_webhook_server", + skip(crds_and_handlers, client) + )] + pub async fn new( + crds_and_handlers: impl IntoIterator, + options: Options, + client: Client, + field_manager: impl Into + Debug, + operator_environment: OperatorEnvironmentOpts, + ) -> Result where H: WebhookHandler + Clone + Send + Sync + 'static, { tracing::debug!("create new conversion webhook server"); - let handler_fn = |Json(review): Json| async { - let review = handler.call(review); - Json(review) - }; + let mut router = Router::new(); + let mut crds = HashMap::new(); + for (crd, handler) in crds_and_handlers { + let crd_name = crd.name_any(); + let handler_fn = |Json(review): Json| async { + let review = handler.call(review); + Json(review) + }; + + router = router.route(&format!("/convert/{crd_name}"), post(handler_fn)); + crds.insert(crd_name, crd); + } + + // This is how Kubernetes calls us, so it decides about the naming. + // AFAIK we can not influence this, so this is the only SAN entry needed. + let webhook_domain_name = format!( + "{service_name}.{operator_namespace}.svc", + service_name = operator_environment.operator_service_name, + operator_namespace = operator_environment.operator_namespace, + ); - let router = Router::new().route("/convert", post(handler_fn)); - Self { router, options } + let (cert_tx, mut cert_rx) = mpsc::channel(1); + let server = WebhookServer::new(router, options, vec![webhook_domain_name], cert_tx) + .await + .context(CreateWebhookServerSnafu)?; + let current_cert = cert_rx + .recv() + .await + .context(ReceiverCertificateFromChannelSnafu)?; + + Ok(Self { + server, + current_cert, + client, + field_manager: field_manager.into(), + crds, + operator_environment, + }) } - /// Creates a new conversion webhook server **with** state which expects - /// POST requests being made to the `/convert` endpoint. - /// - /// Each request is handled by the provided `handler` function. Any function - /// with the signature `(ConversionReview, S) -> ConversionReview` can be - /// provided. The [`ConversionReview`] type can be imported via a re-export at - /// [`crate::servers::ConversionReview`]. + /// Starts the conversion webhook server /// - /// It is recommended to wrap the state in an [`Arc`][std::sync::Arc] if it - /// needs to be mutable, see - /// . - /// - /// # Example - /// - /// ``` - /// use std::sync::Arc; - /// - /// use stackable_webhook::{ - /// servers::{ConversionReview, ConversionWebhookServer}, - /// Options - /// }; - /// - /// #[derive(Debug, Clone)] - /// struct State {} - /// - /// let shared_state = Arc::new(State {}); - /// let server = ConversionWebhookServer::new_with_state( - /// handler, - /// shared_state, - /// Options::default(), - /// ); - /// - /// // Define the handler function - /// fn handler(req: ConversionReview, state: Arc) -> ConversionReview { - /// // In here we can do the CRD conversion - /// req - /// } - /// ``` - #[instrument(name = "create_conversion_webhook_server_with_state", skip(handler))] - pub fn new_with_state(handler: H, state: S, options: Options) -> Self - where - H: StatefulWebhookHandler - + Clone - + Send - + Sync - + 'static, - S: Clone + Debug + Send + Sync + 'static, - { - tracing::debug!("create new conversion webhook server with state"); - - // NOTE (@Techassi): Initially, after adding the state extractor, the - // compiler kept throwing a trait error at me stating that the closure - // below doesn't implement the Handler trait from Axum. This had nothing - // to do with the state itself, but rather the order of extractors. All - // body consuming extractors, like the JSON extractor need to come last - // in the handler. - // https://docs.rs/axum/latest/axum/extract/index.html#the-order-of-extractors - let handler_fn = |State(state): State, Json(review): Json| async { - let review = handler.call(review, state); - Json(review) - }; - - let router = Router::new() - .route("/convert", post(handler_fn)) - .with_state(state); - - Self { router, options } + /// Use [`Self::reconcile_crds`] first to avoid "too old resource version" error + pub async fn run(self) -> Result<(), ConversionWebhookError> { + tracing::info!("starting conversion webhook server"); + + self.server.run().await.context(RunWebhookServerSnafu)?; + + Ok(()) } - /// Starts the conversion webhook server by starting the underlying - /// [`WebhookServer`]. - pub async fn run(self) -> Result<(), crate::Error> { - tracing::info!("starting conversion webhook server"); + #[instrument(skip_all)] + pub async fn reconcile_crds(&self) -> Result<(), ConversionWebhookError> { + tracing::info!(kinds = ?self.crds.keys(), "Reconciling CRDs"); + let ca_bundle = self + .current_cert + .to_pem(LineEnding::LF) + .context(ConvertCaToPemSnafu)?; + + let crd_api: Api = Api::all(self.client.clone()); + for (kind, crd) in &self.crds { + let mut crd = crd.clone(); + + crd.spec.conversion = Some(CustomResourceConversion { + strategy: "Webhook".to_string(), + webhook: Some(WebhookConversion { + // conversionReviewVersions indicates what ConversionReview versions are understood/preferred by the webhook. + // The first version in the list understood by the API server is sent to the webhook. + // The webhook must respond with a ConversionReview object in the same version it received. + conversion_review_versions: vec!["v1".to_string()], + client_config: Some(WebhookClientConfig { + service: Some(ServiceReference { + name: self.operator_environment.operator_service_name.clone(), + namespace: self.operator_environment.operator_namespace.clone(), + path: Some(format!("/convert/{kind}")), + port: Some( + DEFAULT_HTTPS_PORT + .try_into() + .expect("DEFAULT_HTTPS_PORT must be convertible into i32"), + ), + }), + ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())), + url: None, + }), + }), + }); - let server = WebhookServer::new(self.router, self.options); - server.run().await + // TODO: Move this into function and do a more clever update mechanism + let crd_name = crd.name_any(); + let patch = Patch::Apply(&crd); + let patch_params = PatchParams::apply(&self.field_manager); + crd_api + .patch(&crd_name, &patch_params, &patch) + .await + .with_context(|_| UpdateCRDSnafu { + crd_name: crd_name.to_string(), + })?; + tracing::info!(crd_name, "Reconciled CRDs"); + } + Ok(()) } } diff --git a/crates/stackable-webhook/src/servers/mod.rs b/crates/stackable-webhook/src/servers/mod.rs index b242df779..2d87b6cad 100644 --- a/crates/stackable-webhook/src/servers/mod.rs +++ b/crates/stackable-webhook/src/servers/mod.rs @@ -2,4 +2,5 @@ //! purposes. mod conversion; -pub use conversion::*; +pub use conversion::{ConversionWebhookError, ConversionWebhookServer}; +pub use kube::core::conversion::ConversionReview; diff --git a/crates/stackable-webhook/src/tls.rs b/crates/stackable-webhook/src/tls.rs index 2aad52ee4..951931124 100644 --- a/crates/stackable-webhook/src/tls.rs +++ b/crates/stackable-webhook/src/tls.rs @@ -9,11 +9,10 @@ use hyper_util::rt::{TokioExecutor, TokioIo}; use opentelemetry::trace::{FutureExt, SpanKind}; use snafu::{ResultExt, Snafu}; use stackable_certs::{ - CertificatePairError, - ca::{CertificateAuthority, DEFAULT_CA_VALIDITY}, - keys::rsa, + CertificatePair, CertificatePairError, DEFAULT_CERTIFICATE_VALIDITY, ca::CertificateAuthority, + keys::ecdsa, }; -use tokio::net::TcpListener; +use tokio::{net::TcpListener, sync::mpsc}; use tokio_rustls::{ TlsAcceptor, rustls::{ @@ -25,6 +24,7 @@ use tokio_rustls::{ use tower::{Service, ServiceExt}; use tracing::{Instrument, Span, field::Empty, instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; +use x509_cert::Certificate; pub type Result = std::result::Result; @@ -47,12 +47,12 @@ pub enum Error { #[snafu(display("failed to encode leaf certificate as DER"))] EncodeCertificateDer { - source: CertificatePairError, + source: CertificatePairError, }, #[snafu(display("failed to encode private key as DER"))] EncodePrivateKeyDer { - source: CertificatePairError, + source: CertificatePairError, }, #[snafu(display("failed to set safe TLS protocol versions"))] @@ -60,6 +60,9 @@ pub enum Error { #[snafu(display("failed to run task in blocking thread"))] TokioSpawnBlocking { source: tokio::task::JoinError }, + + #[snafu(display("failed send certificate to channel"))] + SendCertificateToChannel, } /// Custom implementation of [`std::cmp::PartialEq`] because some inner types @@ -91,56 +94,81 @@ impl PartialEq for Error { /// via HTTPS with the underlying HTTP router. pub struct TlsServer { config: Arc, + socket_addr: SocketAddr, router: Router, } impl TlsServer { #[instrument(name = "create_tls_server", skip(router))] - pub async fn new(socket_addr: SocketAddr, router: Router) -> Result { - // NOTE(@NickLarsenNZ): This code is not async, and does take some - // non-negligable amount of time to complete (moreso in debug ). - // We run this in a thread reserved for blocking code so that the Tokio - // executor is able to make progress on other futures instead of being - // blocked. - // See https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html - let task = tokio::task::spawn_blocking(move || { - let mut certificate_authority = - CertificateAuthority::new_rsa().context(CreateCertificateAuthoritySnafu)?; - - let leaf_certificate = certificate_authority - .generate_rsa_leaf_certificate("Leaf", "webhook", [], DEFAULT_CA_VALIDITY) - .context(GenerateLeafCertificateSnafu)?; - - let certificate_der = leaf_certificate - .certificate_der() - .context(EncodeCertificateDerSnafu)?; - - let private_key_der = leaf_certificate - .private_key_der() - .context(EncodePrivateKeyDerSnafu)?; - - let tls_provider = default_provider(); - let mut config = ServerConfig::builder_with_provider(tls_provider.into()) - .with_protocol_versions(&[&TLS12, &TLS13]) - .context(SetSafeTlsProtocolVersionsSnafu)? - .with_no_client_auth() - .with_single_cert(vec![certificate_der], private_key_der) - .context(InvalidTlsPrivateKeySnafu)?; - - config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; - let config = Arc::new(config); - - Ok(Self { - socket_addr, - config, - router, - }) + pub async fn new<'a>( + socket_addr: SocketAddr, + router: Router, + subject_alterative_dns_names: Vec, + cert_tx: mpsc::Sender, + ) -> Result { + // The certificate generations can take a while, so we use `spawn_blocking` + let (ca, certificate) = tokio::task::spawn_blocking(move || { + Self::generate_ca_and_certificate(subject_alterative_dns_names) }) .await .context(TokioSpawnBlockingSnafu)??; - Ok(task) + let config = Self::config_from_certificate(certificate)?; + + cert_tx + .send(ca.ca_cert().clone()) + .await + // We intentionally drop the error, as it contains the gigantic certificate + .map_err(|_err| Error::SendCertificateToChannel)?; + + Ok(Self { + config: Arc::new(config), + socket_addr, + router, + }) + } + + fn config_from_certificate( + certificate: CertificatePair, + ) -> Result { + let certificate_der = certificate + .certificate_der() + .context(EncodeCertificateDerSnafu)?; + let private_key_der = certificate + .private_key_der() + .context(EncodePrivateKeyDerSnafu)?; + + let tls_provider = default_provider(); + let mut config = ServerConfig::builder_with_provider(tls_provider.into()) + .with_protocol_versions(&[&TLS12, &TLS13]) + .context(SetSafeTlsProtocolVersionsSnafu)? + .with_no_client_auth() + .with_single_cert(vec![certificate_der], private_key_der) + .context(InvalidTlsPrivateKeySnafu)?; + + config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; + Ok(config) + } + + fn generate_ca_and_certificate( + subject_alterative_dns_names: Vec, + ) -> Result<( + CertificateAuthority, + CertificatePair, + )> { + let mut ca = CertificateAuthority::new_ecdsa().context(CreateCertificateAuthoritySnafu)?; + + let certificate = ca + .generate_ecdsa_leaf_certificate( + "Leaf", + "webhook", + subject_alterative_dns_names.iter().map(|san| san.as_str()), + DEFAULT_CERTIFICATE_VALIDITY, + ) + .context(GenerateLeafCertificateSnafu)?; + + Ok((ca, certificate)) } /// Runs the TLS server by listening for incoming TCP connections on the From ed5856341f883af914b5472838ab3ffdb875f06b Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 30 Jun 2025 14:52:02 +0200 Subject: [PATCH 02/31] Actually rotate certs --- Cargo.lock | 7 + Cargo.toml | 1 + crates/stackable-webhook/Cargo.toml | 1 + crates/stackable-webhook/src/lib.rs | 23 +-- .../src/servers/conversion.rs | 111 +++++++++--- .../src/tls/cert_resolver.rs | 145 +++++++++++++++ .../src/{tls.rs => tls/mod.rs} | 167 ++++++------------ 7 files changed, 304 insertions(+), 151 deletions(-) create mode 100644 crates/stackable-webhook/src/tls/cert_resolver.rs rename crates/stackable-webhook/src/{tls.rs => tls/mod.rs} (67%) diff --git a/Cargo.lock b/Cargo.lock index fbcd856fb..a9c8ccb7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -101,6 +101,12 @@ version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "async-broadcast" version = "0.7.2" @@ -3111,6 +3117,7 @@ dependencies = [ name = "stackable-webhook" version = "0.3.1" dependencies = [ + "arc-swap", "axum", "futures-util", "hyper", diff --git a/Cargo.toml b/Cargo.toml index 4bef733c1..61a9c335b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ repository = "https://github.com/stackabletech/operator-rs" [workspace.dependencies] product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" } +arc-swap = "1.7" axum = { version = "0.8.1", features = ["http2"] } chrono = { version = "0.4.38", default-features = false } clap = { version = "4.5.17", features = ["derive", "cargo", "env"] } diff --git a/crates/stackable-webhook/Cargo.toml b/crates/stackable-webhook/Cargo.toml index abf5aac00..3c2935c17 100644 --- a/crates/stackable-webhook/Cargo.toml +++ b/crates/stackable-webhook/Cargo.toml @@ -11,6 +11,7 @@ stackable-certs = { path = "../stackable-certs", features = ["rustls"] } stackable-telemetry = { path = "../stackable-telemetry" } stackable-operator = { path = "../stackable-operator" } +arc-swap.workspace = true axum.workspace = true futures-util.workspace = true hyper-util.workspace = true diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index a1f2e2786..191ca14d7 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -60,10 +60,10 @@ pub trait WebhookHandler { #[derive(Debug, Snafu)] pub enum WebhookError { #[snafu(display("failed to create TLS server"))] - CreateTlsServer { source: tls::Error }, + CreateTlsServer { source: tls::TlsServerError }, #[snafu(display("failed to run TLS server"))] - RunTlsServer { source: tls::Error }, + RunTlsServer { source: tls::TlsServerError }, } /// A ready-to-use webhook server. @@ -123,8 +123,7 @@ impl WebhookServer { router: Router, options: Options, subject_alterative_dns_names: Vec, - cert_tx: mpsc::Sender, - ) -> Result { + ) -> Result<(Self, mpsc::Receiver), WebhookError> { tracing::trace!("create new webhook server"); // TODO (@Techassi): Make opt-in configurable from the outside @@ -147,16 +146,12 @@ impl WebhookServer { .route("/health", get(|| async { "ok" })); tracing::debug!("create TLS server"); - let tls_server = TlsServer::new( - options.socket_addr, - router, - subject_alterative_dns_names, - cert_tx, - ) - .await - .context(CreateTlsServerSnafu)?; - - Ok(Self { tls_server }) + let (tls_server, cert_rx) = + TlsServer::new(options.socket_addr, router, subject_alterative_dns_names) + .await + .context(CreateTlsServerSnafu)?; + + Ok((Self { tls_server }, cert_rx)) } /// Runs the Webhook server and sets up signal handlers for shutting down. diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index 7877fd4a8..a8dddecb7 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -19,7 +19,7 @@ use kube::{ }; use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::cli::OperatorEnvironmentOpts; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, try_join}; use tracing::instrument; use x509_cert::{ Certificate, @@ -44,6 +44,12 @@ pub enum ConversionWebhookError { #[snafu(display("failed to convert CA certificate into PEM format"))] ConvertCaToPem { source: x509_cert::der::Error }, + #[snafu(display("failed to reconcile CRDs"))] + ReconcileCRDs { + #[snafu(source(from(ConversionWebhookError, Box::new)))] + source: Box, + }, + #[snafu(display("failed to update CRD {crd_name:?}"))] UpdateCRD { source: stackable_operator::kube::Error, @@ -65,8 +71,7 @@ where /// See [`ConversionWebhookServer::new()`] for usage examples. pub struct ConversionWebhookServer { server: WebhookServer, - current_cert: Certificate, - + cert_rx: mpsc::Receiver, client: Client, field_manager: String, crds: HashMap, @@ -144,6 +149,7 @@ impl ConversionWebhookServer { H: WebhookHandler + Clone + Send + Sync + 'static, { tracing::debug!("create new conversion webhook server"); + let field_manager: String = field_manager.into(); let mut router = Router::new(); let mut crds = HashMap::new(); @@ -160,52 +166,111 @@ impl ConversionWebhookServer { // This is how Kubernetes calls us, so it decides about the naming. // AFAIK we can not influence this, so this is the only SAN entry needed. - let webhook_domain_name = format!( + let sans = vec![format!( "{service_name}.{operator_namespace}.svc", service_name = operator_environment.operator_service_name, operator_namespace = operator_environment.operator_namespace, - ); + )]; - let (cert_tx, mut cert_rx) = mpsc::channel(1); - let server = WebhookServer::new(router, options, vec![webhook_domain_name], cert_tx) + let (server, mut cert_rx) = WebhookServer::new(router, options, sans) .await .context(CreateWebhookServerSnafu)?; + + // We block the ConversionWebhookServer creation until the certificates have been generated. + // This way we + // 1. Are able to apply the CRDs before we start the actual controllers relying on them + // 2. Avoid updating them shortly after as cert have been generated. Doing so would cause + // unnecessary "too old resource version" errors in the controllers as the CRD was updated. let current_cert = cert_rx .recv() .await .context(ReceiverCertificateFromChannelSnafu)?; + Self::reconcile_crds( + &client, + &field_manager, + &crds, + &operator_environment, + ¤t_cert, + ) + .await + .context(ReconcileCRDsSnafu)?; Ok(Self { server, - current_cert, + cert_rx, client, - field_manager: field_manager.into(), + field_manager, crds, operator_environment, }) } - /// Starts the conversion webhook server - /// - /// Use [`Self::reconcile_crds`] first to avoid "too old resource version" error pub async fn run(self) -> Result<(), ConversionWebhookError> { tracing::info!("starting conversion webhook server"); - self.server.run().await.context(RunWebhookServerSnafu)?; + let Self { + server, + cert_rx, + client, + field_manager, + crds, + operator_environment, + } = self; + try_join!( + Self::run_webhook_server(server), + Self::run_cert_update_loop( + cert_rx, + &client, + &field_manager, + &crds, + &operator_environment + ), + )?; + + Ok(()) + } + + async fn run_webhook_server(server: WebhookServer) -> Result<(), ConversionWebhookError> { + server.run().await.context(RunWebhookServerSnafu) + } + + async fn run_cert_update_loop( + mut cert_rx: mpsc::Receiver, + client: &Client, + field_manager: &str, + crds: &HashMap, + operator_environment: &OperatorEnvironmentOpts, + ) -> Result<(), ConversionWebhookError> { + while let Some(current_cert) = cert_rx.recv().await { + Self::reconcile_crds( + client, + field_manager, + crds, + operator_environment, + ¤t_cert, + ) + .await + .context(ReconcileCRDsSnafu)?; + } Ok(()) } #[instrument(skip_all)] - pub async fn reconcile_crds(&self) -> Result<(), ConversionWebhookError> { - tracing::info!(kinds = ?self.crds.keys(), "Reconciling CRDs"); - let ca_bundle = self - .current_cert + async fn reconcile_crds( + client: &Client, + field_manager: &str, + crds: &HashMap, + operator_environment: &OperatorEnvironmentOpts, + current_cert: &Certificate, + ) -> Result<(), ConversionWebhookError> { + tracing::info!(kinds = ?crds.keys(), "Reconciling CRDs"); + let ca_bundle = current_cert .to_pem(LineEnding::LF) .context(ConvertCaToPemSnafu)?; - let crd_api: Api = Api::all(self.client.clone()); - for (kind, crd) in &self.crds { + let crd_api: Api = Api::all(client.clone()); + for (kind, crd) in crds { let mut crd = crd.clone(); crd.spec.conversion = Some(CustomResourceConversion { @@ -217,8 +282,8 @@ impl ConversionWebhookServer { conversion_review_versions: vec!["v1".to_string()], client_config: Some(WebhookClientConfig { service: Some(ServiceReference { - name: self.operator_environment.operator_service_name.clone(), - namespace: self.operator_environment.operator_namespace.clone(), + name: operator_environment.operator_service_name.clone(), + namespace: operator_environment.operator_namespace.clone(), path: Some(format!("/convert/{kind}")), port: Some( DEFAULT_HTTPS_PORT @@ -235,14 +300,14 @@ impl ConversionWebhookServer { // TODO: Move this into function and do a more clever update mechanism let crd_name = crd.name_any(); let patch = Patch::Apply(&crd); - let patch_params = PatchParams::apply(&self.field_manager); + let patch_params = PatchParams::apply(field_manager); crd_api .patch(&crd_name, &patch_params, &patch) .await .with_context(|_| UpdateCRDSnafu { crd_name: crd_name.to_string(), })?; - tracing::info!(crd_name, "Reconciled CRDs"); + tracing::info!(crd.name = crd_name, "Reconciled CRD"); } Ok(()) } diff --git a/crates/stackable-webhook/src/tls/cert_resolver.rs b/crates/stackable-webhook/src/tls/cert_resolver.rs new file mode 100644 index 000000000..b3d11a5c8 --- /dev/null +++ b/crates/stackable-webhook/src/tls/cert_resolver.rs @@ -0,0 +1,145 @@ +use std::sync::Arc; + +use arc_swap::ArcSwap; +use snafu::{ResultExt, Snafu}; +use stackable_certs::{CertificatePairError, ca::CertificateAuthority, keys::ecdsa}; +use tokio::sync::mpsc; +use tokio_rustls::rustls::{ + crypto::ring::default_provider, server::ResolvesServerCert, sign::CertifiedKey, +}; +use x509_cert::Certificate; + +use super::WEBHOOK_CERTIFICATE_LIFETIME; + +type Result = std::result::Result; + +#[derive(Debug, Snafu)] +pub enum CertificateResolverError { + #[snafu(display("failed send certificate to channel"))] + SendCertificateToChannel, + + #[snafu(display("failed to generate new certificate"))] + GenerateNewCertificate { + #[snafu(source(from(CertificateResolverError, Box::new)))] + source: Box, + }, + + #[snafu(display("failed to create CA to generate and sign webhook leaf certificate"))] + CreateCertificateAuthority { source: stackable_certs::ca::Error }, + + #[snafu(display("failed to generate webhook leaf certificate"))] + GenerateLeafCertificate { source: stackable_certs::ca::Error }, + + #[snafu(display("failed to encode leaf certificate as DER"))] + EncodeCertificateDer { + source: CertificatePairError, + }, + + #[snafu(display("failed to encode private key as DER"))] + EncodePrivateKeyDer { + source: CertificatePairError, + }, + + #[snafu(display("failed to decode CertifiedKey from DER"))] + DecodeCertifiedKeyFromDer { source: tokio_rustls::rustls::Error }, + + #[snafu(display("failed to run task in blocking thread"))] + TokioSpawnBlocking { source: tokio::task::JoinError }, +} + +#[derive(Debug)] +pub struct CertificateResolver { + /// Using a [`ArcSwap`] (over e.g. [`tokio::sync::RwLock`]), so that we can easily + /// (and performant) bridge between async write and sync write. + current_certified_key: ArcSwap, + subject_alterative_dns_names: Arc>, + + cert_tx: mpsc::Sender, +} + +impl CertificateResolver { + pub async fn new( + subject_alterative_dns_names: Vec, + cert_tx: mpsc::Sender, + ) -> Result { + let subject_alterative_dns_names = Arc::new(subject_alterative_dns_names); + let (cert, certified_key) = Self::generate_new_cert(subject_alterative_dns_names.clone()) + .await + .context(GenerateNewCertificateSnafu)?; + + cert_tx + .send(cert) + .await + .map_err(|_err| CertificateResolverError::SendCertificateToChannel)?; + + Ok(Self { + subject_alterative_dns_names, + current_certified_key: ArcSwap::new(certified_key), + cert_tx, + }) + } + + pub async fn rotate_certificate(&self) -> Result<()> { + let (cert, certified_key) = + Self::generate_new_cert(self.subject_alterative_dns_names.clone()) + .await + .context(GenerateNewCertificateSnafu)?; + + // TODO: Sign the new cert somehow with the old cert. See https://github.com/stackabletech/decisions/issues/56 + + self.cert_tx + .send(cert) + .await + .map_err(|_err| CertificateResolverError::SendCertificateToChannel)?; + + self.current_certified_key.store(certified_key); + + Ok(()) + } + + /// FIXME: This should *not* construct a CA cert and cert, but only a cert! + /// This needs some changes in stackable-certs though. + async fn generate_new_cert( + subject_alterative_dns_names: Arc>, + ) -> Result<(Certificate, Arc)> { + // The certificate generations can take a while, so we use `spawn_blocking` + tokio::task::spawn_blocking(move || { + let tls_provider = default_provider(); + + let mut ca = + CertificateAuthority::new_ecdsa().context(CreateCertificateAuthoritySnafu)?; + + let certificate = ca + .generate_ecdsa_leaf_certificate( + "Leaf", + "webhook", + subject_alterative_dns_names.iter().map(|san| san.as_str()), + WEBHOOK_CERTIFICATE_LIFETIME, + ) + .context(GenerateLeafCertificateSnafu)?; + + let certificate_der = certificate + .certificate_der() + .context(EncodeCertificateDerSnafu)?; + let private_key_der = certificate + .private_key_der() + .context(EncodePrivateKeyDerSnafu)?; + let certificate_key = + CertifiedKey::from_der(vec![certificate_der], private_key_der, &tls_provider) + .context(DecodeCertifiedKeyFromDerSnafu)?; + + Ok((certificate.certificate().clone(), Arc::new(certificate_key))) + }) + .await + .context(TokioSpawnBlockingSnafu)? + } +} + +impl ResolvesServerCert for CertificateResolver { + fn resolve( + &self, + _client_hello: tokio_rustls::rustls::server::ClientHello<'_>, + ) -> Option> { + Some(self.current_certified_key.load().clone()) + } +} diff --git a/crates/stackable-webhook/src/tls.rs b/crates/stackable-webhook/src/tls/mod.rs similarity index 67% rename from crates/stackable-webhook/src/tls.rs rename to crates/stackable-webhook/src/tls/mod.rs index 951931124..9d7e54add 100644 --- a/crates/stackable-webhook/src/tls.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -3,16 +3,14 @@ use std::{net::SocketAddr, sync::Arc}; use axum::{Router, extract::Request}; +use cert_resolver::{CertificateResolver, CertificateResolverError}; use futures_util::pin_mut; use hyper::{body::Incoming, service::service_fn}; use hyper_util::rt::{TokioExecutor, TokioIo}; use opentelemetry::trace::{FutureExt, SpanKind}; use snafu::{ResultExt, Snafu}; -use stackable_certs::{ - CertificatePair, CertificatePairError, DEFAULT_CERTIFICATE_VALIDITY, ca::CertificateAuthority, - keys::ecdsa, -}; -use tokio::{net::TcpListener, sync::mpsc}; +use stackable_operator::time::Duration; +use tokio::{net::TcpListener, sync::mpsc, time::interval}; use tokio_rustls::{ TlsAcceptor, rustls::{ @@ -26,12 +24,17 @@ use tracing::{Instrument, Span, field::Empty, instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; use x509_cert::Certificate; -pub type Result = std::result::Result; +mod cert_resolver; + +pub const WEBHOOK_CERTIFICATE_LIFETIME: Duration = Duration::from_minutes_unchecked(2); +pub const WEBHOOK_CERTIFICATE_ROTATION_INTERVAL: Duration = Duration::from_minutes_unchecked(1); + +pub type Result = std::result::Result; #[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("failed to construct TLS server config, bad certificate/key"))] - InvalidTlsPrivateKey { source: tokio_rustls::rustls::Error }, +pub enum TlsServerError { + #[snafu(display("failed to create certificate resolver"))] + CreateCertificateResolver { source: CertificateResolverError }, #[snafu(display("failed to create TCP listener by binding to socket address {socket_addr:?}"))] BindTcpListener { @@ -39,61 +42,18 @@ pub enum Error { socket_addr: SocketAddr, }, - #[snafu(display("failed to create CA to generate and sign webhook leaf certificate"))] - CreateCertificateAuthority { source: stackable_certs::ca::Error }, - - #[snafu(display("failed to generate webhook leaf certificate"))] - GenerateLeafCertificate { source: stackable_certs::ca::Error }, - - #[snafu(display("failed to encode leaf certificate as DER"))] - EncodeCertificateDer { - source: CertificatePairError, - }, - - #[snafu(display("failed to encode private key as DER"))] - EncodePrivateKeyDer { - source: CertificatePairError, - }, + #[snafu(display("failed to rotate certificate"))] + RotateCertificate { source: CertificateResolverError }, #[snafu(display("failed to set safe TLS protocol versions"))] SetSafeTlsProtocolVersions { source: tokio_rustls::rustls::Error }, - - #[snafu(display("failed to run task in blocking thread"))] - TokioSpawnBlocking { source: tokio::task::JoinError }, - - #[snafu(display("failed send certificate to channel"))] - SendCertificateToChannel, -} - -/// Custom implementation of [`std::cmp::PartialEq`] because some inner types -/// don't implement it. -/// -/// Note that this implementation is restritced to testing because there are -/// variants that use [`stackable_certs::ca::Error`] which only implements -/// [`PartialEq`] for tests. -#[cfg(test)] -impl PartialEq for Error { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - ( - Self::BindTcpListener { - source: lhs_source, - socket_addr: lhs_socket_addr, - }, - Self::BindTcpListener { - source: rhs_source, - socket_addr: rhs_socket_addr, - }, - ) => lhs_socket_addr == rhs_socket_addr && lhs_source.kind() == rhs_source.kind(), - (lhs, rhs) => lhs == rhs, - } - } } -/// A server which terminates TLS connections and allows clients to commnunicate +/// A server which terminates TLS connections and allows clients to communicate /// via HTTPS with the underlying HTTP router. pub struct TlsServer { - config: Arc, + config: ServerConfig, + cert_resolver: Arc, socket_addr: SocketAddr, router: Router, @@ -105,70 +65,31 @@ impl TlsServer { socket_addr: SocketAddr, router: Router, subject_alterative_dns_names: Vec, - cert_tx: mpsc::Sender, - ) -> Result { - // The certificate generations can take a while, so we use `spawn_blocking` - let (ca, certificate) = tokio::task::spawn_blocking(move || { - Self::generate_ca_and_certificate(subject_alterative_dns_names) - }) - .await - .context(TokioSpawnBlockingSnafu)??; - - let config = Self::config_from_certificate(certificate)?; - - cert_tx - .send(ca.ca_cert().clone()) - .await - // We intentionally drop the error, as it contains the gigantic certificate - .map_err(|_err| Error::SendCertificateToChannel)?; - - Ok(Self { - config: Arc::new(config), - socket_addr, - router, - }) - } - - fn config_from_certificate( - certificate: CertificatePair, - ) -> Result { - let certificate_der = certificate - .certificate_der() - .context(EncodeCertificateDerSnafu)?; - let private_key_der = certificate - .private_key_der() - .context(EncodePrivateKeyDerSnafu)?; + ) -> Result<(Self, mpsc::Receiver)> { + let (cert_tx, cert_rx) = mpsc::channel(1); + let cert_resolver = Arc::new( + CertificateResolver::new(subject_alterative_dns_names, cert_tx) + .await + .context(CreateCertificateResolverSnafu)?, + ); let tls_provider = default_provider(); let mut config = ServerConfig::builder_with_provider(tls_provider.into()) .with_protocol_versions(&[&TLS12, &TLS13]) .context(SetSafeTlsProtocolVersionsSnafu)? .with_no_client_auth() - .with_single_cert(vec![certificate_der], private_key_der) - .context(InvalidTlsPrivateKeySnafu)?; - + .with_cert_resolver(cert_resolver.clone()); config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; - Ok(config) - } - fn generate_ca_and_certificate( - subject_alterative_dns_names: Vec, - ) -> Result<( - CertificateAuthority, - CertificatePair, - )> { - let mut ca = CertificateAuthority::new_ecdsa().context(CreateCertificateAuthoritySnafu)?; - - let certificate = ca - .generate_ecdsa_leaf_certificate( - "Leaf", - "webhook", - subject_alterative_dns_names.iter().map(|san| san.as_str()), - DEFAULT_CERTIFICATE_VALIDITY, - ) - .context(GenerateLeafCertificateSnafu)?; - - Ok((ca, certificate)) + Ok(( + Self { + config, + cert_resolver, + socket_addr, + router, + }, + cert_rx, + )) } /// Runs the TLS server by listening for incoming TCP connections on the @@ -176,7 +97,9 @@ impl TlsServer { /// TLS stream get handled by a Hyper service, which in turn is an Axum /// router. pub async fn run(self) -> Result<()> { - let tls_acceptor = TlsAcceptor::from(self.config); + tokio::spawn(async { Self::run_certificate_rotation_loop(self.cert_resolver).await }); + + let tls_acceptor = TlsAcceptor::from(Arc::new(self.config)); let tcp_listener = TcpListener::bind(self.socket_addr) .await @@ -292,6 +215,22 @@ impl TlsServer { ); } } + + async fn run_certificate_rotation_loop(cert_resolver: Arc) -> Result<()> { + let mut interval = interval(*WEBHOOK_CERTIFICATE_ROTATION_INTERVAL); + // Let the interval tick once, so that the first loop iteration does not start immediately, + // thus generating a new cert. + interval.tick().await; + + loop { + interval.tick().await; + + cert_resolver + .rotate_certificate() + .await + .context(RotateCertificateSnafu)?; + } + } } pub trait SocketAddrExt { From 3be083e407a68a2580dcd0f4a604cc957b9469f9 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 30 Jun 2025 14:53:21 +0200 Subject: [PATCH 03/31] clippy --- crates/stackable-webhook/src/servers/conversion.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index a8dddecb7..e84d60f0b 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -285,11 +285,7 @@ impl ConversionWebhookServer { name: operator_environment.operator_service_name.clone(), namespace: operator_environment.operator_namespace.clone(), path: Some(format!("/convert/{kind}")), - port: Some( - DEFAULT_HTTPS_PORT - .try_into() - .expect("DEFAULT_HTTPS_PORT must be convertible into i32"), - ), + port: Some(DEFAULT_HTTPS_PORT.into()), }), ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())), url: None, From fd4d94b51f5a1304167979d9d7800be1ee90ac0e Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 30 Jun 2025 15:01:16 +0200 Subject: [PATCH 04/31] doctests --- crates/stackable-webhook/src/lib.rs | 9 +++------ crates/stackable-webhook/src/servers/conversion.rs | 3 +-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 191ca14d7..f542706df 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -12,9 +12,8 @@ //! use axum::Router; //! use tokio::sync::mpsc; //! -//! let (cert_tx, _cert_rx) = mpsc::channel(1); //! let router = Router::new(); -//! let server = WebhookServer::new(router, Options::default(), vec![], cert_tx); +//! let server = WebhookServer::new(router, Options::default(), vec![]); //! ``` //! //! For some usages, complete end-to-end [`WebhookServer`] implementations @@ -98,9 +97,8 @@ impl WebhookServer { /// use axum::Router; /// use tokio::sync::mpsc; /// - /// let (cert_tx, _cert_rx) = mpsc::channel(1); /// let router = Router::new(); - /// let server = WebhookServer::new(router, Options::default(), vec![], cert_tx); + /// let server = WebhookServer::new(router, Options::default(), vec![]); /// ``` /// /// ### Example with Custom Options @@ -110,14 +108,13 @@ impl WebhookServer { /// use axum::Router; /// use tokio::sync::mpsc; /// - /// let (cert_tx, _cert_rx) = mpsc::channel(1); /// let options = Options::builder() /// .bind_address([127, 0, 0, 1], 8080) /// .build(); /// let sans = vec!["my-san-entry".to_string()]; /// /// let router = Router::new(); - /// let server = WebhookServer::new(router, options, sans, cert_tx); + /// let server = WebhookServer::new(router, options, sans); /// ``` pub async fn new( router: Router, diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index e84d60f0b..c163a8dfc 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -130,8 +130,7 @@ impl ConversionWebhookServer { /// .await /// .expect("failed to create ConversionWebhookServer"); /// - /// // Bootstrap CRDs first to avoid "too old resource version" error - /// conversion_webhook.reconcile_crds().await.expect("failed to reconcile CRDs"); + /// conversion_webhook.run().await.expect("failed to run ConversionWebhookServer"); /// # } /// ``` #[instrument( From b3c17b3f8e77144966871187db8b6042be76b822 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 30 Jun 2025 15:46:24 +0200 Subject: [PATCH 05/31] HashMap -> Vec --- .../src/servers/conversion.rs | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index c163a8dfc..bc9ed29c0 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, fmt::Debug}; +use std::fmt::Debug; use axum::{Json, Router, routing::post}; use k8s_openapi::{ @@ -74,7 +74,7 @@ pub struct ConversionWebhookServer { cert_rx: mpsc::Receiver, client: Client, field_manager: String, - crds: HashMap, + crds: Vec, operator_environment: OperatorEnvironmentOpts, } @@ -106,7 +106,7 @@ impl ConversionWebhookServer { /// # async fn test() { /// let crds_and_handlers = [ /// ( - /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1).unwrap(), + /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1).expect("failed to merge S3Connection CRD"), /// S3Connection::try_convert as fn(ConversionReview) -> ConversionReview, /// ), /// ]; @@ -151,7 +151,7 @@ impl ConversionWebhookServer { let field_manager: String = field_manager.into(); let mut router = Router::new(); - let mut crds = HashMap::new(); + let mut crds = Vec::new(); for (crd, handler) in crds_and_handlers { let crd_name = crd.name_any(); let handler_fn = |Json(review): Json| async { @@ -160,7 +160,7 @@ impl ConversionWebhookServer { }; router = router.route(&format!("/convert/{crd_name}"), post(handler_fn)); - crds.insert(crd_name, crd); + crds.push(crd); } // This is how Kubernetes calls us, so it decides about the naming. @@ -238,7 +238,7 @@ impl ConversionWebhookServer { mut cert_rx: mpsc::Receiver, client: &Client, field_manager: &str, - crds: &HashMap, + crds: &[CustomResourceDefinition], operator_environment: &OperatorEnvironmentOpts, ) -> Result<(), ConversionWebhookError> { while let Some(current_cert) = cert_rx.recv().await { @@ -259,18 +259,21 @@ impl ConversionWebhookServer { async fn reconcile_crds( client: &Client, field_manager: &str, - crds: &HashMap, + crds: &[CustomResourceDefinition], operator_environment: &OperatorEnvironmentOpts, current_cert: &Certificate, ) -> Result<(), ConversionWebhookError> { - tracing::info!(kinds = ?crds.keys(), "Reconciling CRDs"); + tracing::info!( + crds = ?crds.iter().map(CustomResourceDefinition::name_any).collect::>(), + "Reconciling CRDs" + ); let ca_bundle = current_cert .to_pem(LineEnding::LF) .context(ConvertCaToPemSnafu)?; let crd_api: Api = Api::all(client.clone()); - for (kind, crd) in crds { - let mut crd = crd.clone(); + for mut crd in crds.iter().cloned() { + let crd_name = crd.name_any(); crd.spec.conversion = Some(CustomResourceConversion { strategy: "Webhook".to_string(), @@ -283,7 +286,7 @@ impl ConversionWebhookServer { service: Some(ServiceReference { name: operator_environment.operator_service_name.clone(), namespace: operator_environment.operator_namespace.clone(), - path: Some(format!("/convert/{kind}")), + path: Some(format!("/convert/{crd_name}")), port: Some(DEFAULT_HTTPS_PORT.into()), }), ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())), @@ -292,8 +295,6 @@ impl ConversionWebhookServer { }), }); - // TODO: Move this into function and do a more clever update mechanism - let crd_name = crd.name_any(); let patch = Patch::Apply(&crd); let patch_params = PatchParams::apply(field_manager); crd_api @@ -302,7 +303,6 @@ impl ConversionWebhookServer { .with_context(|_| UpdateCRDSnafu { crd_name: crd_name.to_string(), })?; - tracing::info!(crd.name = crd_name, "Reconciled CRD"); } Ok(()) } From bdbcee0039c8d23921c2234018ce8b774a0c34f6 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 30 Jun 2025 15:51:05 +0200 Subject: [PATCH 06/31] Set correct CA lifetime --- Cargo.lock | 1 + crates/stackable-certs/src/ca/mod.rs | 2 +- crates/stackable-certs/src/lib.rs | 4 ---- crates/stackable-webhook/Cargo.toml | 1 + crates/stackable-webhook/src/tls/cert_resolver.rs | 9 +++++++-- crates/stackable-webhook/src/tls/mod.rs | 1 + 6 files changed, 11 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a9c8ccb7b..f13ee0181 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3125,6 +3125,7 @@ dependencies = [ "k8s-openapi", "kube", "opentelemetry", + "rand 0.9.1", "serde_json", "snafu 0.8.5", "stackable-certs", diff --git a/crates/stackable-certs/src/ca/mod.rs b/crates/stackable-certs/src/ca/mod.rs index b2e464b45..f9c2a2f26 100644 --- a/crates/stackable-certs/src/ca/mod.rs +++ b/crates/stackable-certs/src/ca/mod.rs @@ -38,7 +38,7 @@ pub enum Error { #[snafu(display("failed to generate RSA signing key"))] GenerateRsaSigningKey { source: rsa::Error }, - #[snafu(display("failed to generate ECDSA signign key"))] + #[snafu(display("failed to generate ECDSA signing key"))] GenerateEcdsaSigningKey { source: ecdsa::Error }, #[snafu(display("failed to parse {subject:?} as subject"))] diff --git a/crates/stackable-certs/src/lib.rs b/crates/stackable-certs/src/lib.rs index 122b1dd53..5b9c87327 100644 --- a/crates/stackable-certs/src/lib.rs +++ b/crates/stackable-certs/src/lib.rs @@ -22,7 +22,6 @@ use std::ops::Deref; use snafu::Snafu; -use stackable_operator::time::Duration; use x509_cert::{Certificate, spki::EncodePublicKey}; #[cfg(feature = "rustls")] use { @@ -37,9 +36,6 @@ use crate::keys::CertificateKeypair; pub mod ca; pub mod keys; -/// The default certificate validity time span -pub const DEFAULT_CERTIFICATE_VALIDITY: Duration = Duration::from_hours_unchecked(1); - /// Error variants which can be encountered when creating a new /// [`CertificatePair`]. #[derive(Debug, Snafu)] diff --git a/crates/stackable-webhook/Cargo.toml b/crates/stackable-webhook/Cargo.toml index 3c2935c17..178e02e6c 100644 --- a/crates/stackable-webhook/Cargo.toml +++ b/crates/stackable-webhook/Cargo.toml @@ -19,6 +19,7 @@ hyper.workspace = true k8s-openapi.workspace = true kube.workspace = true opentelemetry.workspace = true +rand.workspace = true serde_json.workspace = true snafu.workspace = true tokio-rustls.workspace = true diff --git a/crates/stackable-webhook/src/tls/cert_resolver.rs b/crates/stackable-webhook/src/tls/cert_resolver.rs index b3d11a5c8..f2162728b 100644 --- a/crates/stackable-webhook/src/tls/cert_resolver.rs +++ b/crates/stackable-webhook/src/tls/cert_resolver.rs @@ -9,7 +9,7 @@ use tokio_rustls::rustls::{ }; use x509_cert::Certificate; -use super::WEBHOOK_CERTIFICATE_LIFETIME; +use super::{WEBHOOK_CA_LIFETIME, WEBHOOK_CERTIFICATE_LIFETIME}; type Result = std::result::Result; @@ -18,6 +18,9 @@ pub enum CertificateResolverError { #[snafu(display("failed send certificate to channel"))] SendCertificateToChannel, + #[snafu(display("failed to generate ECDSA signing key"))] + GenerateEcdsaSigningKey { source: ecdsa::Error }, + #[snafu(display("failed to generate new certificate"))] GenerateNewCertificate { #[snafu(source(from(CertificateResolverError, Box::new)))] @@ -106,8 +109,10 @@ impl CertificateResolver { tokio::task::spawn_blocking(move || { let tls_provider = default_provider(); + let ca_key = ecdsa::SigningKey::new().context(GenerateEcdsaSigningKeySnafu)?; let mut ca = - CertificateAuthority::new_ecdsa().context(CreateCertificateAuthoritySnafu)?; + CertificateAuthority::new_with(ca_key, rand::random::(), WEBHOOK_CA_LIFETIME) + .context(CreateCertificateAuthoritySnafu)?; let certificate = ca .generate_ecdsa_leaf_certificate( diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index 9d7e54add..f86db7a1c 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -26,6 +26,7 @@ use x509_cert::Certificate; mod cert_resolver; +pub const WEBHOOK_CA_LIFETIME: Duration = Duration::from_minutes_unchecked(3); pub const WEBHOOK_CERTIFICATE_LIFETIME: Duration = Duration::from_minutes_unchecked(2); pub const WEBHOOK_CERTIFICATE_ROTATION_INTERVAL: Duration = Duration::from_minutes_unchecked(1); From b4c22b0469035d6513731a77d6cceb8b412b182b Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 30 Jun 2025 15:55:18 +0200 Subject: [PATCH 07/31] Result type --- crates/stackable-webhook/src/lib.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index f542706df..df6d181ff 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -56,6 +56,9 @@ pub trait WebhookHandler { fn call(self, req: Req) -> Res; } +/// A result type alias with the library-level [`Error`] type as the default error type. +pub type Result = std::result::Result; + #[derive(Debug, Snafu)] pub enum WebhookError { #[snafu(display("failed to create TLS server"))] @@ -120,7 +123,7 @@ impl WebhookServer { router: Router, options: Options, subject_alterative_dns_names: Vec, - ) -> Result<(Self, mpsc::Receiver), WebhookError> { + ) -> Result<(Self, mpsc::Receiver)> { tracing::trace!("create new webhook server"); // TODO (@Techassi): Make opt-in configurable from the outside @@ -154,7 +157,7 @@ impl WebhookServer { /// Runs the Webhook server and sets up signal handlers for shutting down. /// /// This does not implement graceful shutdown of the underlying server. - pub async fn run(self) -> Result<(), WebhookError> { + pub async fn run(self) -> Result<()> { let future_server = self.run_server(); let future_signal = async { let mut sigint = signal(SignalKind::interrupt()).expect("create SIGINT listener"); @@ -187,7 +190,7 @@ impl WebhookServer { /// Runs the webhook server by creating a TCP listener and binding it to /// the specified socket address. - async fn run_server(self) -> Result<(), WebhookError> { + async fn run_server(self) -> Result<()> { tracing::debug!("run webhook server"); self.tls_server.run().await.context(RunTlsServerSnafu) From 31b128b418fdb06f61d9d2116bc3937e58ac7d26 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 30 Jun 2025 15:58:21 +0200 Subject: [PATCH 08/31] fix rustdocs --- crates/stackable-webhook/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index df6d181ff..49335ed03 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -56,7 +56,7 @@ pub trait WebhookHandler { fn call(self, req: Req) -> Res; } -/// A result type alias with the library-level [`Error`] type as the default error type. +/// A result type alias with the [`WebhookError`] type as the default error type. pub type Result = std::result::Result; #[derive(Debug, Snafu)] From b84cf053dc092aea21656ae70ec7e5abe14f3071 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 30 Jun 2025 16:06:14 +0200 Subject: [PATCH 09/31] Add some docs --- crates/stackable-webhook/src/servers/conversion.rs | 6 ++++-- crates/stackable-webhook/src/tls/cert_resolver.rs | 5 +++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index bc9ed29c0..2bf0a52e7 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -106,14 +106,16 @@ impl ConversionWebhookServer { /// # async fn test() { /// let crds_and_handlers = [ /// ( - /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1).expect("failed to merge S3Connection CRD"), + /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1) + /// .expect("failed to merge S3Connection CRD"), /// S3Connection::try_convert as fn(ConversionReview) -> ConversionReview, /// ), /// ]; /// /// const OPERATOR_NAME: &str = "PRODUCT_OPERATOR"; /// let client = Client::try_default().await.expect("failed to create Kubernetes client"); - /// // Normally you would get this from the CLI arguments in `ProductOperatorRun::operator_environment` + /// // Normally you would get this from the CLI arguments in + /// // `ProductOperatorRun::operator_environment` /// let operator_environment = OperatorEnvironmentOpts { /// operator_namespace: "stackable-operator".to_string(), /// operator_service_name: "product-operator".to_string(), diff --git a/crates/stackable-webhook/src/tls/cert_resolver.rs b/crates/stackable-webhook/src/tls/cert_resolver.rs index f2162728b..cd63d34a0 100644 --- a/crates/stackable-webhook/src/tls/cert_resolver.rs +++ b/crates/stackable-webhook/src/tls/cert_resolver.rs @@ -50,6 +50,11 @@ pub enum CertificateResolverError { TokioSpawnBlocking { source: tokio::task::JoinError }, } +/// This struct serves as [`ResolvesServerCert`] to always hand out the current certificate for TLS +/// client connections. +/// +/// It offers the [`Self::rotate_certificate`] function to create a fresh certificate and basically +/// hot-reload the certificate in the running webhook. #[derive(Debug)] pub struct CertificateResolver { /// Using a [`ArcSwap`] (over e.g. [`tokio::sync::RwLock`]), so that we can easily From 0390680babd4eaec81b6c3e57c95f37ad39c8ec9 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Wed, 2 Jul 2025 07:30:46 +0200 Subject: [PATCH 10/31] Update rustdoc --- crates/stackable-webhook/src/servers/conversion.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index 2bf0a52e7..e50552e84 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -108,7 +108,7 @@ impl ConversionWebhookServer { /// ( /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1) /// .expect("failed to merge S3Connection CRD"), - /// S3Connection::try_convert as fn(ConversionReview) -> ConversionReview, + /// S3Connection::try_convert as fn(_) -> _, /// ), /// ]; /// From 2e1d085684275298ca97718707d9edbeb7e9fa4b Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Wed, 2 Jul 2025 08:09:18 +0200 Subject: [PATCH 11/31] Handle cert rottion loop falures --- .../src/servers/conversion.rs | 4 ++-- crates/stackable-webhook/src/tls/mod.rs | 23 +++++++++++++++---- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index e50552e84..5e8ef62b0 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -220,7 +220,7 @@ impl ConversionWebhookServer { try_join!( Self::run_webhook_server(server), - Self::run_cert_update_loop( + Self::run_crd_reconciliation_loop( cert_rx, &client, &field_manager, @@ -236,7 +236,7 @@ impl ConversionWebhookServer { server.run().await.context(RunWebhookServerSnafu) } - async fn run_cert_update_loop( + async fn run_crd_reconciliation_loop( mut cert_rx: mpsc::Receiver, client: &Client, field_manager: &str, diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index f86db7a1c..d1e1e4afc 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -10,7 +10,7 @@ use hyper_util::rt::{TokioExecutor, TokioIo}; use opentelemetry::trace::{FutureExt, SpanKind}; use snafu::{ResultExt, Snafu}; use stackable_operator::time::Duration; -use tokio::{net::TcpListener, sync::mpsc, time::interval}; +use tokio::{net::TcpListener, select, sync::mpsc, time::interval}; use tokio_rustls::{ TlsAcceptor, rustls::{ @@ -48,6 +48,9 @@ pub enum TlsServerError { #[snafu(display("failed to set safe TLS protocol versions"))] SetSafeTlsProtocolVersions { source: tokio_rustls::rustls::Error }, + + #[snafu(display("failed to run certificate rotation loop"))] + RunCertificateRotationLoop { source: tokio::task::JoinError }, } /// A server which terminates TLS connections and allows clients to communicate @@ -98,7 +101,8 @@ impl TlsServer { /// TLS stream get handled by a Hyper service, which in turn is an Axum /// router. pub async fn run(self) -> Result<()> { - tokio::spawn(async { Self::run_certificate_rotation_loop(self.cert_resolver).await }); + let certificate_rotation_loop = + tokio::spawn(async { Self::run_certificate_rotation_loop(self.cert_resolver).await }); let tls_acceptor = TlsAcceptor::from(Arc::new(self.config)); let tcp_listener = @@ -123,12 +127,21 @@ impl TlsServer { .router .into_make_service_with_connect_info::(); - pin_mut!(tcp_listener); + pin_mut!(certificate_rotation_loop); loop { let tls_acceptor = tls_acceptor.clone(); - // Wait for new tcp connection - let (tcp_stream, remote_addr) = match tcp_listener.accept().await { + // Wait for either a new TCP connection or the certificate rotation loop to exit + let tcp_stream = select! { + loop_result = &mut certificate_rotation_loop => { + return loop_result.context(RunCertificateRotationLoopSnafu)?; + } + tcp_stream = tcp_listener.accept() => { + tcp_stream + } + }; + + let (tcp_stream, remote_addr) = match tcp_stream { Ok((stream, addr)) => (stream, addr), Err(err) => { tracing::trace!(%err, "failed to accept incoming TCP connection"); From 3f8e74456cffce71a6c98a6fe49fc0058968f80c Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Wed, 2 Jul 2025 08:09:43 +0200 Subject: [PATCH 12/31] changelog --- crates/stackable-operator/CHANGELOG.md | 5 ++++- crates/stackable-webhook/CHANGELOG.md | 14 +++++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/crates/stackable-operator/CHANGELOG.md b/crates/stackable-operator/CHANGELOG.md index 4a740e29a..5184f07d6 100644 --- a/crates/stackable-operator/CHANGELOG.md +++ b/crates/stackable-operator/CHANGELOG.md @@ -7,7 +7,9 @@ All notable changes to this project will be documented in this file. ### Changed - Update `kube` to `1.1.0` ([#1049]). -- BREAKING: Return type for `ListenerOperatorVolumeSourceBuilder::new()` is no onger a `Result` ([#1058]). +- BREAKING: Return type for `ListenerOperatorVolumeSourceBuilder::new()` is no longer a `Result` ([#1058]). +- BREAKING: Require two new CLI arguments: `--operator-namespace` and `-operator-service-name`. + These are required, so that the operator knows what Service it needs to enter as CRD conversion webhook ([#1066]). ### Fixed @@ -23,6 +25,7 @@ All notable changes to this project will be documented in this file. [#1058]: https://github.com/stackabletech/operator-rs/pull/1058 [#1060]: https://github.com/stackabletech/operator-rs/pull/1060 [#1064]: https://github.com/stackabletech/operator-rs/pull/1064 +[#1066]: https://github.com/stackabletech/operator-rs/pull/1066 ## [0.93.2] - 2025-05-26 diff --git a/crates/stackable-webhook/CHANGELOG.md b/crates/stackable-webhook/CHANGELOG.md index a7354362f..b1118e07a 100644 --- a/crates/stackable-webhook/CHANGELOG.md +++ b/crates/stackable-webhook/CHANGELOG.md @@ -4,6 +4,15 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- BREAKING: Re-write the `ConversionWebhookServer`. + It can now do CRD conversions, handle multiple CRDs and takes care of reconciling the CRDs ([#1066]). +- BREAKING: The `TlsServer` can now handle certificate rotation. + To achieve this, a new `CertificateResolver` was added. + Also, `TlsServer::new` now returns an additional `mpsc::Receiver`, so that the caller + can get notified about certificate rotations happening ([#1066]). + ### Fixed - Don't pull in the `aws-lc-rs` crate, as this currently fails to build in `make run-dev` ([#1043]). @@ -15,13 +24,16 @@ All notable changes to this project will be documented in this file. deployed to Kubernetes (e.g. conversion or mutating - which this crate targets) need to be accessible by it, which is not the case when only using loopback. Also, the constant `DEFAULT_SOCKET_ADDR` has been renamed to `DEFAULT_SOCKET_ADDRESS` ([#1045]). +- BREAKING: The `TlsServer` now requires you to pass SAN (subject alternative name) DNS entries, + so the caller will trust the issued certificate ([#1066]). [#1043]: https://github.com/stackabletech/operator-rs/pull/1043 [#1045]: https://github.com/stackabletech/operator-rs/pull/1045 +[#1066]: https://github.com/stackabletech/operator-rs/pull/1066 ## [0.3.1] - 2024-07-10 -## Changed +### Changed - Remove instrumentation of long running functions, add more granular instrumentation of futures. Adjust span and event levels ([#811]). - Bump rust-toolchain to 1.79.0 ([#822]). From 6f0040cf5c1956a41a3e9e94b0df822abbc69800 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Wed, 2 Jul 2025 08:10:43 +0200 Subject: [PATCH 13/31] link to decision --- crates/stackable-webhook/src/tls/cert_resolver.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/stackable-webhook/src/tls/cert_resolver.rs b/crates/stackable-webhook/src/tls/cert_resolver.rs index cd63d34a0..22ae89b9a 100644 --- a/crates/stackable-webhook/src/tls/cert_resolver.rs +++ b/crates/stackable-webhook/src/tls/cert_resolver.rs @@ -107,6 +107,7 @@ impl CertificateResolver { /// FIXME: This should *not* construct a CA cert and cert, but only a cert! /// This needs some changes in stackable-certs though. + /// See https://github.com/stackabletech/decisions/issues/56 async fn generate_new_cert( subject_alterative_dns_names: Arc>, ) -> Result<(Certificate, Arc)> { From 941ee6b22b3810549ca804a63c0868c8a0feb4b6 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Wed, 2 Jul 2025 08:20:35 +0200 Subject: [PATCH 14/31] fix rustdocs --- crates/stackable-webhook/src/tls/cert_resolver.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/stackable-webhook/src/tls/cert_resolver.rs b/crates/stackable-webhook/src/tls/cert_resolver.rs index 22ae89b9a..e8417f9bf 100644 --- a/crates/stackable-webhook/src/tls/cert_resolver.rs +++ b/crates/stackable-webhook/src/tls/cert_resolver.rs @@ -107,7 +107,7 @@ impl CertificateResolver { /// FIXME: This should *not* construct a CA cert and cert, but only a cert! /// This needs some changes in stackable-certs though. - /// See https://github.com/stackabletech/decisions/issues/56 + /// See [the relevant decision](https://github.com/stackabletech/decisions/issues/56) async fn generate_new_cert( subject_alterative_dns_names: Arc>, ) -> Result<(Certificate, Arc)> { From 2ec9b735c7e8c4a37e73cbfca21e93ad731cd992 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 09:06:40 +0200 Subject: [PATCH 15/31] Rework CLI structs and names --- crates/stackable-operator/CHANGELOG.md | 3 ++ crates/stackable-operator/src/cli.rs | 40 +++++++++---------- crates/stackable-operator/src/client.rs | 16 ++++---- .../src/utils/cluster_info.rs | 8 ++-- .../src/servers/conversion.rs | 10 ++--- 5 files changed, 40 insertions(+), 37 deletions(-) diff --git a/crates/stackable-operator/CHANGELOG.md b/crates/stackable-operator/CHANGELOG.md index 51004b12e..92da2a1a3 100644 --- a/crates/stackable-operator/CHANGELOG.md +++ b/crates/stackable-operator/CHANGELOG.md @@ -37,6 +37,9 @@ All notable changes to this project will be documented in this file. - BREAKING: Return type for `ListenerOperatorVolumeSourceBuilder::new()` is no longer a `Result` ([#1058]). - BREAKING: Require two new CLI arguments: `--operator-namespace` and `-operator-service-name`. These are required, so that the operator knows what Service it needs to enter as CRD conversion webhook ([#1066]). +- BREAKING: The `ProductOperatorRun` used for CLI arguments has some field renamed for consistency ([#1066]): + - `telemetry_arguments` -> `telemetry` + - `cluster_info_opts` -> `cluster_info` ### Fixed diff --git a/crates/stackable-operator/src/cli.rs b/crates/stackable-operator/src/cli.rs index 63ab0ea2d..b36ab9415 100644 --- a/crates/stackable-operator/src/cli.rs +++ b/crates/stackable-operator/src/cli.rs @@ -116,7 +116,7 @@ use product_config::ProductConfigManager; use snafu::{ResultExt, Snafu}; use stackable_telemetry::tracing::TelemetryOptions; -use crate::{namespace::WatchNamespace, utils::cluster_info::KubernetesClusterInfoOpts}; +use crate::{namespace::WatchNamespace, utils::cluster_info::KubernetesClusterInfoOptions}; pub const AUTHOR: &str = "Stackable GmbH - info@stackable.tech"; @@ -163,10 +163,10 @@ pub enum Command { /// Can be embedded into an extended argument set: /// /// ```rust -/// # use stackable_operator::cli::{Command, OperatorEnvironmentOpts, ProductOperatorRun, ProductConfigPath}; +/// # use stackable_operator::cli::{Command, OperatorEnvironmentOptions, ProductOperatorRun, ProductConfigPath}; +/// # use stackable_operator::{namespace::WatchNamespace, utils::cluster_info::KubernetesClusterInfoOptions}; +/// # use stackable_telemetry::tracing::TelemetryOptions; /// use clap::Parser; -/// use stackable_operator::{namespace::WatchNamespace, utils::cluster_info::KubernetesClusterInfoOpts}; -/// use stackable_telemetry::tracing::TelemetryOptions; /// /// #[derive(clap::Parser, Debug, PartialEq, Eq)] /// struct Run { @@ -197,12 +197,12 @@ pub enum Command { /// common: ProductOperatorRun { /// product_config: ProductConfigPath::from("bar".as_ref()), /// watch_namespace: WatchNamespace::One("foobar".to_string()), -/// telemetry_arguments: TelemetryOptions::default(), -/// cluster_info_opts: KubernetesClusterInfoOpts { +/// telemetry: TelemetryOptions::default(), +/// cluster_info: KubernetesClusterInfoOptions { /// kubernetes_cluster_domain: None, /// kubernetes_node_name: "baz".to_string(), /// }, -/// operator_environment: OperatorEnvironmentOpts { +/// operator_environment: OperatorEnvironmentOptions { /// operator_namespace: "stackable-operators".to_string(), /// operator_service_name: "foo-operator".to_string(), /// }, @@ -239,13 +239,13 @@ pub struct ProductOperatorRun { pub watch_namespace: WatchNamespace, #[command(flatten)] - pub operator_environment: OperatorEnvironmentOpts, + pub operator_environment: OperatorEnvironmentOptions, #[command(flatten)] - pub telemetry_arguments: TelemetryOptions, + pub telemetry: TelemetryOptions, #[command(flatten)] - pub cluster_info_opts: KubernetesClusterInfoOpts, + pub cluster_info: KubernetesClusterInfoOptions, } /// A path to a [`ProductConfigManager`] spec file @@ -304,7 +304,7 @@ impl ProductConfigPath { } #[derive(clap::Parser, Debug, PartialEq, Eq)] -pub struct OperatorEnvironmentOpts { +pub struct OperatorEnvironmentOptions { /// The namespace the operator is running in, usually `stackable-operators`. #[arg(long, env)] pub operator_namespace: String, @@ -440,12 +440,12 @@ mod tests { ProductOperatorRun { product_config: ProductConfigPath::from("bar".as_ref()), watch_namespace: WatchNamespace::One("foo".to_string()), - cluster_info_opts: KubernetesClusterInfoOpts { + cluster_info: KubernetesClusterInfoOptions { kubernetes_cluster_domain: None, kubernetes_node_name: "baz".to_string() }, - telemetry_arguments: Default::default(), - operator_environment: OperatorEnvironmentOpts { + telemetry: Default::default(), + operator_environment: OperatorEnvironmentOptions { operator_namespace: "stackable-operators".to_string(), operator_service_name: "foo-operator".to_string(), } @@ -469,12 +469,12 @@ mod tests { ProductOperatorRun { product_config: ProductConfigPath::from("bar".as_ref()), watch_namespace: WatchNamespace::All, - cluster_info_opts: KubernetesClusterInfoOpts { + cluster_info: KubernetesClusterInfoOptions { kubernetes_cluster_domain: None, kubernetes_node_name: "baz".to_string() }, - telemetry_arguments: Default::default(), - operator_environment: OperatorEnvironmentOpts { + telemetry: Default::default(), + operator_environment: OperatorEnvironmentOptions { operator_namespace: "stackable-operators".to_string(), operator_service_name: "foo-operator".to_string(), } @@ -493,12 +493,12 @@ mod tests { ProductOperatorRun { product_config: ProductConfigPath::from("bar".as_ref()), watch_namespace: WatchNamespace::One("foo".to_string()), - cluster_info_opts: KubernetesClusterInfoOpts { + cluster_info: KubernetesClusterInfoOptions { kubernetes_cluster_domain: None, kubernetes_node_name: "baz".to_string() }, - telemetry_arguments: Default::default(), - operator_environment: OperatorEnvironmentOpts { + telemetry: Default::default(), + operator_environment: OperatorEnvironmentOptions { operator_namespace: "stackable-operators".to_string(), operator_service_name: "foo-operator".to_string(), } diff --git a/crates/stackable-operator/src/client.rs b/crates/stackable-operator/src/client.rs index 5d493866e..f79a1eb91 100644 --- a/crates/stackable-operator/src/client.rs +++ b/crates/stackable-operator/src/client.rs @@ -21,7 +21,7 @@ use tracing::trace; use crate::{ kvp::LabelSelectorExt, - utils::cluster_info::{KubernetesClusterInfo, KubernetesClusterInfoOpts}, + utils::cluster_info::{KubernetesClusterInfo, KubernetesClusterInfoOptions}, }; pub type Result = std::result::Result; @@ -529,13 +529,13 @@ impl Client { /// use k8s_openapi::api::core::v1::Pod; /// use stackable_operator::{ /// client::{Client, initialize_operator}, - /// utils::cluster_info::KubernetesClusterInfoOpts, + /// utils::cluster_info::KubernetesClusterInfoOptions, /// }; /// /// #[tokio::main] /// async fn main() { - /// let cluster_info_opts = KubernetesClusterInfoOpts::parse(); - /// let client = initialize_operator(None, &cluster_info_opts) + /// let cluster_info_options = KubernetesClusterInfoOptions::parse(); + /// let client = initialize_operator(None, &cluster_info_options) /// .await /// .expect("Unable to construct client."); /// let watcher_config: watcher::Config = @@ -652,7 +652,7 @@ where pub async fn initialize_operator( field_manager: Option, - cluster_info_opts: &KubernetesClusterInfoOpts, + cluster_info_opts: &KubernetesClusterInfoOptions, ) -> Result { let kubeconfig: Config = kube::Config::infer() .await @@ -687,10 +687,10 @@ mod tests { }; use tokio::time::error::Elapsed; - use crate::utils::cluster_info::KubernetesClusterInfoOpts; + use crate::utils::cluster_info::KubernetesClusterInfoOptions; - async fn test_cluster_info_opts() -> KubernetesClusterInfoOpts { - KubernetesClusterInfoOpts { + async fn test_cluster_info_opts() -> KubernetesClusterInfoOptions { + KubernetesClusterInfoOptions { // We have to hard-code a made-up cluster domain, // since kubernetes_node_name (probably) won't be a valid Node that we can query. kubernetes_cluster_domain: Some( diff --git a/crates/stackable-operator/src/utils/cluster_info.rs b/crates/stackable-operator/src/utils/cluster_info.rs index 56c718f9e..b10fccc90 100644 --- a/crates/stackable-operator/src/utils/cluster_info.rs +++ b/crates/stackable-operator/src/utils/cluster_info.rs @@ -16,7 +16,7 @@ pub struct KubernetesClusterInfo { } #[derive(clap::Parser, Debug, PartialEq, Eq)] -pub struct KubernetesClusterInfoOpts { +pub struct KubernetesClusterInfoOptions { /// Kubernetes cluster domain, usually this is `cluster.local`. // We are not using a default value here, as we query the cluster if it is not specified. #[arg(long, env)] @@ -30,10 +30,10 @@ pub struct KubernetesClusterInfoOpts { impl KubernetesClusterInfo { pub async fn new( client: &Client, - cluster_info_opts: &KubernetesClusterInfoOpts, + cluster_info_opts: &KubernetesClusterInfoOptions, ) -> Result { let cluster_domain = match cluster_info_opts { - KubernetesClusterInfoOpts { + KubernetesClusterInfoOptions { kubernetes_cluster_domain: Some(cluster_domain), .. } => { @@ -41,7 +41,7 @@ impl KubernetesClusterInfo { cluster_domain.clone() } - KubernetesClusterInfoOpts { + KubernetesClusterInfoOptions { kubernetes_node_name: node_name, .. } => { diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index 5e8ef62b0..26f6e6d84 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -18,7 +18,7 @@ use kube::{ api::{Patch, PatchParams}, }; use snafu::{OptionExt, ResultExt, Snafu}; -use stackable_operator::cli::OperatorEnvironmentOpts; +use stackable_operator::cli::OperatorEnvironmentOptions; use tokio::{sync::mpsc, try_join}; use tracing::instrument; use x509_cert::{ @@ -75,7 +75,7 @@ pub struct ConversionWebhookServer { client: Client, field_manager: String, crds: Vec, - operator_environment: OperatorEnvironmentOpts, + operator_environment: OperatorEnvironmentOptions, } impl ConversionWebhookServer { @@ -144,7 +144,7 @@ impl ConversionWebhookServer { options: Options, client: Client, field_manager: impl Into + Debug, - operator_environment: OperatorEnvironmentOpts, + operator_environment: OperatorEnvironmentOptions, ) -> Result where H: WebhookHandler + Clone + Send + Sync + 'static, @@ -241,7 +241,7 @@ impl ConversionWebhookServer { client: &Client, field_manager: &str, crds: &[CustomResourceDefinition], - operator_environment: &OperatorEnvironmentOpts, + operator_environment: &OperatorEnvironmentOptions, ) -> Result<(), ConversionWebhookError> { while let Some(current_cert) = cert_rx.recv().await { Self::reconcile_crds( @@ -262,7 +262,7 @@ impl ConversionWebhookServer { client: &Client, field_manager: &str, crds: &[CustomResourceDefinition], - operator_environment: &OperatorEnvironmentOpts, + operator_environment: &OperatorEnvironmentOptions, current_cert: &Certificate, ) -> Result<(), ConversionWebhookError> { tracing::info!( From a8a2381161827631d6c028ce9de9f95340ed016b Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 09:39:36 +0200 Subject: [PATCH 16/31] test: Remove CLI parsing from env var test --- crates/stackable-operator/src/cli.rs | 33 +--------------------------- 1 file changed, 1 insertion(+), 32 deletions(-) diff --git a/crates/stackable-operator/src/cli.rs b/crates/stackable-operator/src/cli.rs index b36ab9415..b480f330d 100644 --- a/crates/stackable-operator/src/cli.rs +++ b/crates/stackable-operator/src/cli.rs @@ -317,7 +317,7 @@ pub struct OperatorEnvironmentOptions { #[cfg(test)] mod tests { - use std::{env, fs::File}; + use std::fs::File; use clap::Parser; use rstest::*; @@ -328,10 +328,6 @@ mod tests { const USER_PROVIDED_PATH: &str = "user_provided_path_properties.yaml"; const DEPLOY_FILE_PATH: &str = "deploy_config_spec_properties.yaml"; const DEFAULT_FILE_PATH: &str = "default_file_path_properties.yaml"; - const WATCH_NAMESPACE: &str = "WATCH_NAMESPACE"; - const OPERATOR_NAMESPACE: &str = "OPERATOR_NAMESPACE"; - const OPERATOR_SERVICE_NAME: &str = "OPERATOR_SERVICE_NAME"; - const KUBERNETES_NODE_NAME: &str = "KUBERNETES_NODE_NAME"; #[test] fn verify_cli() { @@ -418,9 +414,6 @@ mod tests { #[test] fn product_operator_run_watch_namespace() { - // clean env var to not interfere if already set - unsafe { env::remove_var(WATCH_NAMESPACE) }; - // cli with namespace let opts = ProductOperatorRun::parse_from([ "run", @@ -480,29 +473,5 @@ mod tests { } } ); - - // env with namespace - unsafe { env::set_var(WATCH_NAMESPACE, "foo") }; - unsafe { env::set_var(OPERATOR_SERVICE_NAME, "foo-operator") }; - unsafe { env::set_var(OPERATOR_NAMESPACE, "stackable-operators") }; - unsafe { env::set_var(KUBERNETES_NODE_NAME, "baz") }; - - let opts = ProductOperatorRun::parse_from(["run", "--product-config", "bar"]); - assert_eq!( - opts, - ProductOperatorRun { - product_config: ProductConfigPath::from("bar".as_ref()), - watch_namespace: WatchNamespace::One("foo".to_string()), - cluster_info: KubernetesClusterInfoOptions { - kubernetes_cluster_domain: None, - kubernetes_node_name: "baz".to_string() - }, - telemetry: Default::default(), - operator_environment: OperatorEnvironmentOptions { - operator_namespace: "stackable-operators".to_string(), - operator_service_name: "foo-operator".to_string(), - } - } - ); } } From 90893852d82da54b8b9fad6e6f8b7d0fd6ec3401 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 10:11:15 +0200 Subject: [PATCH 17/31] Move subject_alterative_dns_names into Options --- Cargo.lock | 1 + crates/stackable-webhook/Cargo.toml | 3 +++ crates/stackable-webhook/src/lib.rs | 11 ++++---- crates/stackable-webhook/src/options.rs | 26 +++++++++++++++++++ .../src/servers/conversion.rs | 19 ++++++-------- crates/stackable-webhook/src/tls/mod.rs | 2 +- 6 files changed, 44 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 28e54a9a4..42e224734 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3101,6 +3101,7 @@ version = "0.3.1" dependencies = [ "arc-swap", "axum", + "clap", "futures-util", "hyper", "hyper-util", diff --git a/crates/stackable-webhook/Cargo.toml b/crates/stackable-webhook/Cargo.toml index 178e02e6c..ece1480da 100644 --- a/crates/stackable-webhook/Cargo.toml +++ b/crates/stackable-webhook/Cargo.toml @@ -29,3 +29,6 @@ tower.workspace = true tracing.workspace = true tracing-opentelemetry.workspace = true x509-cert.workspace = true + +[dev-dependencies] +clap.workspace = true diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 49335ed03..32a628417 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -13,7 +13,7 @@ //! use tokio::sync::mpsc; //! //! let router = Router::new(); -//! let server = WebhookServer::new(router, Options::default(), vec![]); +//! let server = WebhookServer::new(router, Options::default()); //! ``` //! //! For some usages, complete end-to-end [`WebhookServer`] implementations @@ -101,7 +101,7 @@ impl WebhookServer { /// use tokio::sync::mpsc; /// /// let router = Router::new(); - /// let server = WebhookServer::new(router, Options::default(), vec![]); + /// let server = WebhookServer::new(router, Options::default()); /// ``` /// /// ### Example with Custom Options @@ -113,16 +113,15 @@ impl WebhookServer { /// /// let options = Options::builder() /// .bind_address([127, 0, 0, 1], 8080) + /// .add_subject_alterative_dns_name("my-san-entry") /// .build(); - /// let sans = vec!["my-san-entry".to_string()]; /// /// let router = Router::new(); - /// let server = WebhookServer::new(router, options, sans); + /// let server = WebhookServer::new(router, options); /// ``` pub async fn new( router: Router, options: Options, - subject_alterative_dns_names: Vec, ) -> Result<(Self, mpsc::Receiver)> { tracing::trace!("create new webhook server"); @@ -147,7 +146,7 @@ impl WebhookServer { tracing::debug!("create TLS server"); let (tls_server, cert_rx) = - TlsServer::new(options.socket_addr, router, subject_alterative_dns_names) + TlsServer::new(options.socket_addr, router, options.subject_alterative_dns_names) .await .context(CreateTlsServerSnafu)?; diff --git a/crates/stackable-webhook/src/options.rs b/crates/stackable-webhook/src/options.rs index 99a01133e..666395ea0 100644 --- a/crates/stackable-webhook/src/options.rs +++ b/crates/stackable-webhook/src/options.rs @@ -41,6 +41,10 @@ pub struct Options { /// The default HTTPS socket address the [`TcpListener`][tokio::net::TcpListener] /// binds to. pub socket_addr: SocketAddr, + + /// The subject alterative DNS names that should be added to the certificates generated for this + /// webhook. + pub subject_alterative_dns_names: Vec, } impl Default for Options { @@ -66,6 +70,7 @@ impl Options { #[derive(Debug, Default)] pub struct OptionsBuilder { socket_addr: Option, + subject_alterative_dns_names: Vec, } impl OptionsBuilder { @@ -91,11 +96,32 @@ impl OptionsBuilder { self } + /// Sets the subject alterative DNS names that should be added to the certificates generated for + /// this webhook. + pub fn subject_alterative_dns_names( + mut self, + subject_alterative_dns_name: Vec, + ) -> Self { + self.subject_alterative_dns_names = subject_alterative_dns_name; + self + } + + /// Adds the (subject alterative DNS name to the list of names. + pub fn add_subject_alterative_dns_name( + mut self, + subject_alterative_dns_name: impl Into, + ) -> Self { + self.subject_alterative_dns_names + .push(subject_alterative_dns_name.into()); + self + } + /// Builds the final [`Options`] by using default values for any not /// explicitly set option. pub fn build(self) -> Options { Options { socket_addr: self.socket_addr.unwrap_or(DEFAULT_SOCKET_ADDRESS), + subject_alterative_dns_names: self.subject_alterative_dns_names, } } } diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index 26f6e6d84..2bea2b863 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -95,11 +95,12 @@ impl ConversionWebhookServer { /// # Example /// /// ```no_run + /// use clap::Parser; /// use stackable_webhook::{ /// servers::{ConversionReview, ConversionWebhookServer}, /// Options /// }; - /// use stackable_operator::cli::OperatorEnvironmentOpts; + /// use stackable_operator::cli::OperatorEnvironmentOptions; /// use stackable_operator::kube::Client; /// use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion}; /// @@ -114,12 +115,7 @@ impl ConversionWebhookServer { /// /// const OPERATOR_NAME: &str = "PRODUCT_OPERATOR"; /// let client = Client::try_default().await.expect("failed to create Kubernetes client"); - /// // Normally you would get this from the CLI arguments in - /// // `ProductOperatorRun::operator_environment` - /// let operator_environment = OperatorEnvironmentOpts { - /// operator_namespace: "stackable-operator".to_string(), - /// operator_service_name: "product-operator".to_string(), - /// }; + /// let operator_environment = OperatorEnvironmentOptions::parse(); /// /// // Construct the conversion webhook server /// let conversion_webhook = ConversionWebhookServer::new( @@ -141,7 +137,7 @@ impl ConversionWebhookServer { )] pub async fn new( crds_and_handlers: impl IntoIterator, - options: Options, + mut options: Options, client: Client, field_manager: impl Into + Debug, operator_environment: OperatorEnvironmentOptions, @@ -167,13 +163,14 @@ impl ConversionWebhookServer { // This is how Kubernetes calls us, so it decides about the naming. // AFAIK we can not influence this, so this is the only SAN entry needed. - let sans = vec![format!( + let subject_alterative_dns_name = format!( "{service_name}.{operator_namespace}.svc", service_name = operator_environment.operator_service_name, operator_namespace = operator_environment.operator_namespace, - )]; + ); + options.subject_alterative_dns_names.push(subject_alterative_dns_name); - let (server, mut cert_rx) = WebhookServer::new(router, options, sans) + let (server, mut cert_rx) = WebhookServer::new(router, options) .await .context(CreateWebhookServerSnafu)?; diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index d1e1e4afc..0dbf349f6 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -65,7 +65,7 @@ pub struct TlsServer { impl TlsServer { #[instrument(name = "create_tls_server", skip(router))] - pub async fn new<'a>( + pub async fn new( socket_addr: SocketAddr, router: Router, subject_alterative_dns_names: Vec, From 4671a41fcfecfe63eacab0d0d8de065da038cec2 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 10:12:06 +0200 Subject: [PATCH 18/31] changelog --- crates/stackable-webhook/CHANGELOG.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/stackable-webhook/CHANGELOG.md b/crates/stackable-webhook/CHANGELOG.md index b1118e07a..c34b32519 100644 --- a/crates/stackable-webhook/CHANGELOG.md +++ b/crates/stackable-webhook/CHANGELOG.md @@ -24,8 +24,6 @@ All notable changes to this project will be documented in this file. deployed to Kubernetes (e.g. conversion or mutating - which this crate targets) need to be accessible by it, which is not the case when only using loopback. Also, the constant `DEFAULT_SOCKET_ADDR` has been renamed to `DEFAULT_SOCKET_ADDRESS` ([#1045]). -- BREAKING: The `TlsServer` now requires you to pass SAN (subject alternative name) DNS entries, - so the caller will trust the issued certificate ([#1066]). [#1043]: https://github.com/stackabletech/operator-rs/pull/1043 [#1045]: https://github.com/stackabletech/operator-rs/pull/1045 From a3b3cc1bd5c4f1d426337aa2be4a3b7af5cc6b8e Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 10:12:48 +0200 Subject: [PATCH 19/31] Remove mpsc in tests leftover --- crates/stackable-webhook/src/lib.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 32a628417..233a54a35 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -10,7 +10,6 @@ //! ``` //! use stackable_webhook::{WebhookServer, Options}; //! use axum::Router; -//! use tokio::sync::mpsc; //! //! let router = Router::new(); //! let server = WebhookServer::new(router, Options::default()); @@ -98,7 +97,6 @@ impl WebhookServer { /// ``` /// use stackable_webhook::{WebhookServer, Options}; /// use axum::Router; - /// use tokio::sync::mpsc; /// /// let router = Router::new(); /// let server = WebhookServer::new(router, Options::default()); @@ -109,7 +107,6 @@ impl WebhookServer { /// ``` /// use stackable_webhook::{WebhookServer, Options}; /// use axum::Router; - /// use tokio::sync::mpsc; /// /// let options = Options::builder() /// .bind_address([127, 0, 0, 1], 8080) From f8683e4d1e78f29df282c3cc77419ec806497cfe Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 10:18:55 +0200 Subject: [PATCH 20/31] docs: Use result of WebhookServer::new --- crates/stackable-webhook/src/lib.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 233a54a35..06607866d 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -11,8 +11,12 @@ //! use stackable_webhook::{WebhookServer, Options}; //! use axum::Router; //! +//! # async fn test() { //! let router = Router::new(); -//! let server = WebhookServer::new(router, Options::default()); +//! let (server, cert_rx) = WebhookServer::new(router, Options::default()) +//! .await +//! .expect("failed to create WebhookServer"); +//! # } //! ``` //! //! For some usages, complete end-to-end [`WebhookServer`] implementations @@ -98,8 +102,12 @@ impl WebhookServer { /// use stackable_webhook::{WebhookServer, Options}; /// use axum::Router; /// + /// # async fn test() { /// let router = Router::new(); - /// let server = WebhookServer::new(router, Options::default()); + /// let (server, cert_rx) = WebhookServer::new(router, Options::default()) + /// .await + /// .expect("failed to create WebhookServer"); + /// # } /// ``` /// /// ### Example with Custom Options @@ -108,13 +116,17 @@ impl WebhookServer { /// use stackable_webhook::{WebhookServer, Options}; /// use axum::Router; /// + /// # async fn test() { /// let options = Options::builder() /// .bind_address([127, 0, 0, 1], 8080) /// .add_subject_alterative_dns_name("my-san-entry") /// .build(); /// /// let router = Router::new(); - /// let server = WebhookServer::new(router, options); + /// let (server, cert_rx) = WebhookServer::new(router, options) + /// .await + /// .expect("failed to create WebhookServer"); + /// # } /// ``` pub async fn new( router: Router, From 457dbf39e098b4aa0f42b4c37aa63077275b38cd Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 10:20:09 +0200 Subject: [PATCH 21/31] fmt --- crates/stackable-webhook/src/lib.rs | 11 +++++++---- crates/stackable-webhook/src/servers/conversion.rs | 4 +++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 06607866d..c37c423f2 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -154,10 +154,13 @@ impl WebhookServer { .route("/health", get(|| async { "ok" })); tracing::debug!("create TLS server"); - let (tls_server, cert_rx) = - TlsServer::new(options.socket_addr, router, options.subject_alterative_dns_names) - .await - .context(CreateTlsServerSnafu)?; + let (tls_server, cert_rx) = TlsServer::new( + options.socket_addr, + router, + options.subject_alterative_dns_names, + ) + .await + .context(CreateTlsServerSnafu)?; Ok((Self { tls_server }, cert_rx)) } diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index 2bea2b863..bb06bcc70 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -168,7 +168,9 @@ impl ConversionWebhookServer { service_name = operator_environment.operator_service_name, operator_namespace = operator_environment.operator_namespace, ); - options.subject_alterative_dns_names.push(subject_alterative_dns_name); + options + .subject_alterative_dns_names + .push(subject_alterative_dns_name); let (server, mut cert_rx) = WebhookServer::new(router, options) .await From a39ee4722e1c084f330ff7e9c7af05a3bc1623be Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 10:27:56 +0200 Subject: [PATCH 22/31] Update crates/stackable-webhook/src/tls/mod.rs Co-authored-by: Techassi --- crates/stackable-webhook/src/tls/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index 0dbf349f6..be91164bb 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -71,11 +71,11 @@ impl TlsServer { subject_alterative_dns_names: Vec, ) -> Result<(Self, mpsc::Receiver)> { let (cert_tx, cert_rx) = mpsc::channel(1); - let cert_resolver = Arc::new( - CertificateResolver::new(subject_alterative_dns_names, cert_tx) + + let cert_resolver = CertificateResolver::new(subject_alterative_dns_names, cert_tx) .await - .context(CreateCertificateResolverSnafu)?, - ); + .context(CreateCertificateResolverSnafu)? + let cert_resolver = Arc::new(cert_resolver); let tls_provider = default_provider(); let mut config = ServerConfig::builder_with_provider(tls_provider.into()) From 6210262075dcbe6aff302c9e5af9a167472bccd2 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 10:29:43 +0200 Subject: [PATCH 23/31] fix suggestion --- crates/stackable-webhook/src/tls/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index be91164bb..f53b295c5 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -73,8 +73,8 @@ impl TlsServer { let (cert_tx, cert_rx) = mpsc::channel(1); let cert_resolver = CertificateResolver::new(subject_alterative_dns_names, cert_tx) - .await - .context(CreateCertificateResolverSnafu)? + .await + .context(CreateCertificateResolverSnafu)?; let cert_resolver = Arc::new(cert_resolver); let tls_provider = default_provider(); From dcb22bde4d92f4535eafeffcd4ada8a90cece547 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 10:32:14 +0200 Subject: [PATCH 24/31] capture in variable --- crates/stackable-webhook/src/tls/mod.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index f53b295c5..b3d3d0571 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -85,15 +85,14 @@ impl TlsServer { .with_cert_resolver(cert_resolver.clone()); config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; - Ok(( - Self { - config, - cert_resolver, - socket_addr, - router, - }, - cert_rx, - )) + let tls_server = Self { + config, + cert_resolver, + socket_addr, + router, + }; + + Ok((tls_server, cert_rx)) } /// Runs the TLS server by listening for incoming TCP connections on the From f1fee5d43dcb3d4819a7a311853c0a288f2a5c41 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 10:39:21 +0200 Subject: [PATCH 25/31] Move route into variable --- crates/stackable-webhook/src/servers/conversion.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index bb06bcc70..03fc3b23c 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -157,7 +157,8 @@ impl ConversionWebhookServer { Json(review) }; - router = router.route(&format!("/convert/{crd_name}"), post(handler_fn)); + let route = format!("/convert/{crd_name}"); + router = router.route(&route, post(handler_fn)); crds.push(crd); } From caddffb612770aa7f7399258e94b1cc0b36b852d Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 10:45:03 +0200 Subject: [PATCH 26/31] docs: Mention background cert rotation --- crates/stackable-webhook/src/tls/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index b3d3d0571..f920258b9 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -55,6 +55,8 @@ pub enum TlsServerError { /// A server which terminates TLS connections and allows clients to communicate /// via HTTPS with the underlying HTTP router. +/// +/// It also rotates the generated certificates as needed. pub struct TlsServer { config: ServerConfig, cert_resolver: Arc, @@ -99,6 +101,8 @@ impl TlsServer { /// bound socket address. It only accepts TLS connections. Internally each /// TLS stream get handled by a Hyper service, which in turn is an Axum /// router. + /// + /// It also starts a background task to rotate the certificate as needed. pub async fn run(self) -> Result<()> { let certificate_rotation_loop = tokio::spawn(async { Self::run_certificate_rotation_loop(self.cert_resolver).await }); From 7450d85040aa4505eccb51da4620e6e707b7d2f0 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 10:55:37 +0200 Subject: [PATCH 27/31] docs: Document TlsServer::new --- crates/stackable-webhook/src/tls/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index f920258b9..560331b7c 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -66,6 +66,11 @@ pub struct TlsServer { } impl TlsServer { + /// Create a new [`TlsServer`]. + /// + /// This create a [`CertificateResolver`] with the provided `subject_alterative_dns_names`, + /// which takes care of the certificate rotation. Afterwards it create the [`ServerConfig`], + /// which let's the [`CertificateResolver`] provide the needed certificates. #[instrument(name = "create_tls_server", skip(router))] pub async fn new( socket_addr: SocketAddr, From ac29c0eb7051d35b526d0f72ba827229c25894c8 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 12:23:50 +0200 Subject: [PATCH 28/31] docs: Hint on downward API env var --- crates/stackable-operator/src/cli.rs | 4 ++++ crates/stackable-operator/src/utils/cluster_info.rs | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/crates/stackable-operator/src/cli.rs b/crates/stackable-operator/src/cli.rs index b480f330d..b19fb3693 100644 --- a/crates/stackable-operator/src/cli.rs +++ b/crates/stackable-operator/src/cli.rs @@ -306,6 +306,10 @@ impl ProductConfigPath { #[derive(clap::Parser, Debug, PartialEq, Eq)] pub struct OperatorEnvironmentOptions { /// The namespace the operator is running in, usually `stackable-operators`. + /// + /// Note that when running the operator on Kubernetes we recommend to use the + /// [downward API](https://kubernetes.io/docs/concepts/workloads/pods/downward-api/) + /// to let Kubernetes mount the namespace as the `OPERATOR_NAMESPACE` env variable. #[arg(long, env)] pub operator_namespace: String, diff --git a/crates/stackable-operator/src/utils/cluster_info.rs b/crates/stackable-operator/src/utils/cluster_info.rs index b10fccc90..0cc92e9e9 100644 --- a/crates/stackable-operator/src/utils/cluster_info.rs +++ b/crates/stackable-operator/src/utils/cluster_info.rs @@ -23,6 +23,10 @@ pub struct KubernetesClusterInfoOptions { pub kubernetes_cluster_domain: Option, /// Name of the Kubernetes Node that the operator is running on. + /// + /// Note that when running the operator on Kubernetes we recommend to use the + /// [downward API](https://kubernetes.io/docs/concepts/workloads/pods/downward-api/) + /// to let Kubernetes mount the namespace as the `KUBERNETES_NODE_NAME` env variable. #[arg(long, env)] pub kubernetes_node_name: String, } From 51321f089b2d7915e96ab750f11eb0be4fe33d1a Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 16:08:17 +0200 Subject: [PATCH 29/31] Rewrite certificate rotation logic Co-authored-by: Techassi --- crates/stackable-webhook/src/tls/mod.rs | 223 ++++++++++++------------ 1 file changed, 113 insertions(+), 110 deletions(-) diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index 560331b7c..9bfcd6562 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -2,15 +2,21 @@ //! server, which can be used in combination with an Axum [`Router`]. use std::{net::SocketAddr, sync::Arc}; -use axum::{Router, extract::Request}; +use axum::{ + Router, + extract::{ConnectInfo, Request}, + middleware::AddExtension, +}; use cert_resolver::{CertificateResolver, CertificateResolverError}; -use futures_util::pin_mut; use hyper::{body::Incoming, service::service_fn}; use hyper_util::rt::{TokioExecutor, TokioIo}; use opentelemetry::trace::{FutureExt, SpanKind}; use snafu::{ResultExt, Snafu}; use stackable_operator::time::Duration; -use tokio::{net::TcpListener, select, sync::mpsc, time::interval}; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::mpsc, +}; use tokio_rustls::{ TlsAcceptor, rustls::{ @@ -48,9 +54,6 @@ pub enum TlsServerError { #[snafu(display("failed to set safe TLS protocol versions"))] SetSafeTlsProtocolVersions { source: tokio_rustls::rustls::Error }, - - #[snafu(display("failed to run certificate rotation loop"))] - RunCertificateRotationLoop { source: tokio::task::JoinError }, } /// A server which terminates TLS connections and allows clients to communicate @@ -109,8 +112,8 @@ impl TlsServer { /// /// It also starts a background task to rotate the certificate as needed. pub async fn run(self) -> Result<()> { - let certificate_rotation_loop = - tokio::spawn(async { Self::run_certificate_rotation_loop(self.cert_resolver).await }); + let start = tokio::time::Instant::now() + *WEBHOOK_CERTIFICATE_ROTATION_INTERVAL; + let mut interval = tokio::time::interval_at(start, *WEBHOOK_CERTIFICATE_ROTATION_INTERVAL); let tls_acceptor = TlsAcceptor::from(Arc::new(self.config)); let tcp_listener = @@ -135,123 +138,123 @@ impl TlsServer { .router .into_make_service_with_connect_info::(); - pin_mut!(certificate_rotation_loop); loop { let tls_acceptor = tls_acceptor.clone(); - // Wait for either a new TCP connection or the certificate rotation loop to exit - let tcp_stream = select! { - loop_result = &mut certificate_rotation_loop => { - return loop_result.context(RunCertificateRotationLoopSnafu)?; - } - tcp_stream = tcp_listener.accept() => { - tcp_stream - } - }; - - let (tcp_stream, remote_addr) = match tcp_stream { - Ok((stream, addr)) => (stream, addr), - Err(err) => { - tracing::trace!(%err, "failed to accept incoming TCP connection"); - continue; + // Wait for either a new TCP connection or the certificate rotation interval tick + tokio::select! { + // We opt for a biased execution of arms to make sure we always check if the + // certificate needs rotation based on the interval. This ensures, we always use + // a valid certificate for the TLS connection. + biased; + + // This is cancellation-safe. If this branch is cancelled, the tick is NOT consumed. + // As such, we will not miss rotating the certificate. + _ = interval.tick() => { + self.cert_resolver + .rotate_certificate() + .await + .context(RotateCertificateSnafu)? } - }; - // Here, the connect info is extracted by calling Tower's Service - // trait function on `IntoMakeServiceWithConnectInfo` - let tower_service = router.call(remote_addr).await.unwrap(); - - let span = tracing::debug_span!("accept tcp connection"); - tokio::spawn( - async move { - let span = tracing::trace_span!( - "accept tls connection", - "otel.kind" = ?SpanKind::Server, - "otel.status_code" = Empty, - "otel.status_message" = Empty, - "client.address" = remote_addr.ip().to_string(), - "client.port" = remote_addr.port() as i64, - "server.address" = Empty, - "server.port" = Empty, - "network.peer.address" = remote_addr.ip().to_string(), - "network.peer.port" = remote_addr.port() as i64, - "network.local.address" = Empty, - "network.local.port" = Empty, - "network.transport" = "tcp", - "network.type" = self.socket_addr.semantic_convention_network_type(), - ); - - if let Ok(local_addr) = tcp_stream.local_addr() { - let addr = &local_addr.ip().to_string(); - let port = local_addr.port(); - span.record("server.address", addr) - .record("server.port", port as i64) - .record("network.local.address", addr) - .record("network.local.port", port as i64); - } - - // Wait for tls handshake to happen - let tls_stream = match tls_acceptor - .accept(tcp_stream) - .instrument(span.clone()) - .await - { - Ok(tls_stream) => tls_stream, + // This is cancellation-safe. If cancelled, no new connections are accepted. + tcp_connection = tcp_listener.accept() => { + let (tcp_stream, remote_addr) = match tcp_connection { + Ok((stream, addr)) => (stream, addr), Err(err) => { - span.record("otel.status_code", "Error") - .record("otel.status_message", err.to_string()); - tracing::trace!(%remote_addr, "error during tls handshake connection"); - return; + tracing::trace!(%err, "failed to accept incoming TCP connection"); + continue; } }; - // Hyper has its own `AsyncRead` and `AsyncWrite` traits and doesn't use tokio. - // `TokioIo` converts between them. - let tls_stream = TokioIo::new(tls_stream); - - // Hyper also has its own `Service` trait and doesn't use tower. We can use - // `hyper::service::service_fn` to create a hyper `Service` that calls our app through - // `tower::Service::call`. - let hyper_service = service_fn(move |request: Request| { - // This carries the current context with the trace id so that the TraceLayer can use that as a parent - let otel_context = Span::current().context(); - // We need to clone here, because oneshot consumes self - tower_service - .clone() - .oneshot(request) - .with_context(otel_context) - }); - - let span = tracing::debug_span!("serve connection"); - hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()) - .serve_connection_with_upgrades(tls_stream, hyper_service) - .instrument(span.clone()) - .await - .unwrap_or_else(|err| { - span.record("otel.status_code", "Error") - .record("otel.status_message", err.to_string()); - tracing::warn!(%err, %remote_addr, "failed to serve connection"); - }) + // Here, the connect info is extracted by calling Tower's Service + // trait function on `IntoMakeServiceWithConnectInfo` + let tower_service = router.call(remote_addr).await.unwrap(); + + let span = tracing::debug_span!("accept tcp connection"); + tokio::spawn(async move { + Self::handle_request(tcp_stream, remote_addr, tls_acceptor, tower_service, self.socket_addr) + }.instrument(span)); } - .instrument(span), - ); + }; } } - async fn run_certificate_rotation_loop(cert_resolver: Arc) -> Result<()> { - let mut interval = interval(*WEBHOOK_CERTIFICATE_ROTATION_INTERVAL); - // Let the interval tick once, so that the first loop iteration does not start immediately, - // thus generating a new cert. - interval.tick().await; + async fn handle_request( + tcp_stream: TcpStream, + remote_addr: SocketAddr, + tls_acceptor: TlsAcceptor, + tower_service: AddExtension>, + socket_addr: SocketAddr, + ) { + let span = tracing::trace_span!( + "accept tls connection", + "otel.kind" = ?SpanKind::Server, + "otel.status_code" = Empty, + "otel.status_message" = Empty, + "client.address" = remote_addr.ip().to_string(), + "client.port" = remote_addr.port() as i64, + "server.address" = Empty, + "server.port" = Empty, + "network.peer.address" = remote_addr.ip().to_string(), + "network.peer.port" = remote_addr.port() as i64, + "network.local.address" = Empty, + "network.local.port" = Empty, + "network.transport" = "tcp", + "network.type" = socket_addr.semantic_convention_network_type(), + ); + + if let Ok(local_addr) = tcp_stream.local_addr() { + let addr = &local_addr.ip().to_string(); + let port = local_addr.port(); + span.record("server.address", addr) + .record("server.port", port as i64) + .record("network.local.address", addr) + .record("network.local.port", port as i64); + } - loop { - interval.tick().await; + // Wait for tls handshake to happen + let tls_stream = match tls_acceptor + .accept(tcp_stream) + .instrument(span.clone()) + .await + { + Ok(tls_stream) => tls_stream, + Err(err) => { + span.record("otel.status_code", "Error") + .record("otel.status_message", err.to_string()); + tracing::trace!(%remote_addr, "error during tls handshake connection"); + return; + } + }; - cert_resolver - .rotate_certificate() - .await - .context(RotateCertificateSnafu)?; - } + // Hyper has its own `AsyncRead` and `AsyncWrite` traits and doesn't use tokio. + // `TokioIo` converts between them. + let tls_stream = TokioIo::new(tls_stream); + + // Hyper also has its own `Service` trait and doesn't use tower. We can use + // `hyper::service::service_fn` to create a hyper `Service` that calls our app through + // `tower::Service::call`. + let hyper_service = service_fn(move |request: Request| { + // This carries the current context with the trace id so that the TraceLayer can use that as a parent + let otel_context = Span::current().context(); + // We need to clone here, because oneshot consumes self + tower_service + .clone() + .oneshot(request) + .with_context(otel_context) + }); + + let span = tracing::debug_span!("serve connection"); + hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()) + .serve_connection_with_upgrades(tls_stream, hyper_service) + .instrument(span.clone()) + .await + .unwrap_or_else(|err| { + span.record("otel.status_code", "Error") + .record("otel.status_message", err.to_string()); + tracing::warn!(%err, %remote_addr, "failed to serve connection"); + }) } } From 082a59d18941ec16de348f0f0a3a9716a6258a12 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 16:19:48 +0200 Subject: [PATCH 30/31] Avoid unwrap --- crates/stackable-webhook/src/tls/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index 9bfcd6562..114319835 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -1,6 +1,6 @@ //! This module contains structs and functions to easily create a TLS termination //! server, which can be used in combination with an Axum [`Router`]. -use std::{net::SocketAddr, sync::Arc}; +use std::{convert::Infallible, net::SocketAddr, sync::Arc}; use axum::{ Router, @@ -169,7 +169,8 @@ impl TlsServer { // Here, the connect info is extracted by calling Tower's Service // trait function on `IntoMakeServiceWithConnectInfo` - let tower_service = router.call(remote_addr).await.unwrap(); + let tower_service: Result<_, Infallible> = router.call(remote_addr).await; + let tower_service = tower_service.expect("Infallible error can never happen"); let span = tracing::debug_span!("accept tcp connection"); tokio::spawn(async move { From c0326339d5a06ea1090ac6921694c39dfb116338 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 4 Jul 2025 16:27:42 +0200 Subject: [PATCH 31/31] Make CertResolver pub, so docs pass --- crates/stackable-webhook/src/tls/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index 114319835..d1cb10b95 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -7,7 +7,6 @@ use axum::{ extract::{ConnectInfo, Request}, middleware::AddExtension, }; -use cert_resolver::{CertificateResolver, CertificateResolverError}; use hyper::{body::Incoming, service::service_fn}; use hyper_util::rt::{TokioExecutor, TokioIo}; use opentelemetry::trace::{FutureExt, SpanKind}; @@ -32,6 +31,8 @@ use x509_cert::Certificate; mod cert_resolver; +pub use cert_resolver::{CertificateResolver, CertificateResolverError}; + pub const WEBHOOK_CA_LIFETIME: Duration = Duration::from_minutes_unchecked(3); pub const WEBHOOK_CERTIFICATE_LIFETIME: Duration = Duration::from_minutes_unchecked(2); pub const WEBHOOK_CERTIFICATE_ROTATION_INTERVAL: Duration = Duration::from_minutes_unchecked(1);