diff --git a/.gitignore b/.gitignore index 7a658bdbc..9dd4db640 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ src/protos/*.rs # Keep secrets here /.cloud_certs/ cloud_envs.fish +/.claude/settings.local.json diff --git a/AGENTS.md b/AGENTS.md index 2ec9f27d3..ac56f89be 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -15,6 +15,7 @@ document as your quick reference when submitting pull requests. - `tests/` – integration, heavy, and manual tests - `arch_docs/` – architectural design documents - Contributor guide: `README.md` +- `target/` - This contains compiled files. You never need to look in here. ## Repo Specific Utilities @@ -38,6 +39,9 @@ cargo integ-test # integration tests (starts ephemeral server by default) cargo test --test heavy_tests # load tests -- agents do not need to run this and should not ``` +Rust compilation can take some time. Do not interrupt builds or tests unless they are taking more +than 10 minutes. + Additional checks: ```bash @@ -61,6 +65,8 @@ Documentation can be generated with `cargo doc`. Reviewers will look for: - All builds, tests, and lints passing in CI + - Note that some tests cause intentional panics. That does not mean the test failed. You should + only consider tests that have failed according to the harness to be a real problem. - New tests covering behavior changes - Clear and concise code following existing style (see `README.md` for error handling guidance) - Documentation updates for any public API changes diff --git a/Cargo.toml b/Cargo.toml index 0dd3e7749..2c2ae0f9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,11 +8,11 @@ license-file = "LICENSE.txt" [workspace.dependencies] derive_builder = "0.20" -derive_more = { version = "2.0", features = ["constructor", "display", "from", "into", "debug"] } +derive_more = { version = "2.0", features = ["constructor", "display", "from", "into", "debug", "try_into"] } thiserror = "2" -tonic = "0.12" -tonic-build = "0.12" -opentelemetry = { version = "0.29", features = ["metrics"] } +tonic = "0.13" +tonic-build = "0.13" +opentelemetry = { version = "0.30", features = ["metrics"] } prost = "0.13" prost-types = "0.13" diff --git a/client/Cargo.toml b/client/Cargo.toml index ba77fbbfa..811abff87 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -20,9 +20,10 @@ backoff = "0.4" base64 = "0.22" derive_builder = { workspace = true } derive_more = { workspace = true } +bytes = "1.10" futures-util = { version = "0.3", default-features = false } futures-retry = "0.6.0" -http = "1.1.0" +http = "1.1" http-body-util = "0.1" hyper = { version = "1.4.1" } hyper-util = "0.1.6" @@ -31,7 +32,7 @@ parking_lot = "0.12" slotmap = "1.0" thiserror = { workspace = true } tokio = "1.1" -tonic = { workspace = true, features = ["tls", "tls-roots"] } +tonic = { workspace = true, features = ["tls-ring", "tls-native-roots"] } tower = { version = "0.5", features = ["util"] } tracing = "0.1" url = "2.2" diff --git a/client/src/lib.rs b/client/src/lib.rs index 8517b5a41..cf701eca5 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -72,7 +72,7 @@ use temporal_sdk_core_protos::{ }; use tonic::{ Code, - body::BoxBody, + body::Body, client::GrpcService, codegen::InterceptedService, metadata::{MetadataKey, MetadataMap, MetadataValue}, @@ -595,7 +595,7 @@ fn get_decode_max_size() -> usize { impl TemporalServiceClient where T: Clone, - T: GrpcService + Send + Clone + 'static, + T: GrpcService + Send + Clone + 'static, T::ResponseBody: tonic::codegen::Body + Send + 'static, T::Error: Into, ::Error: Into + Send, @@ -1175,13 +1175,13 @@ impl From for Priority { impl WorkflowClientTrait for T where T: RawClientLike + NamespacedClient + Clone + Send + Sync + 'static, - ::SvcType: GrpcService + Send + Clone + 'static, - <::SvcType as GrpcService>::ResponseBody: + ::SvcType: GrpcService + Send + Clone + 'static, + <::SvcType as GrpcService>::ResponseBody: tonic::codegen::Body + Send + 'static, - <::SvcType as GrpcService>::Error: + <::SvcType as GrpcService>::Error: Into, - <::SvcType as GrpcService>::Future: Send, - <<::SvcType as GrpcService>::ResponseBody + <::SvcType as GrpcService>::Future: Send, + <<::SvcType as GrpcService>::ResponseBody as tonic::codegen::Body>::Error: Into + Send, { async fn start_workflow( @@ -1673,6 +1673,15 @@ impl RequestExt for tonic::Request { } } +macro_rules! dbg_panic { + ($($arg:tt)*) => { + use tracing::error; + error!($($arg)*); + debug_assert!(false, $($arg)*); + }; +} +pub(crate) use dbg_panic; + #[cfg(test)] mod tests { use super::*; diff --git a/client/src/metrics.rs b/client/src/metrics.rs index 935b461c2..83899a527 100644 --- a/client/src/metrics.rs +++ b/client/src/metrics.rs @@ -1,4 +1,4 @@ -use crate::{AttachMetricLabels, CallType}; +use crate::{AttachMetricLabels, CallType, dbg_panic}; use futures_util::{FutureExt, future::BoxFuture}; use std::{ sync::Arc, @@ -6,10 +6,10 @@ use std::{ time::{Duration, Instant}, }; use temporal_sdk_core_api::telemetry::metrics::{ - CoreMeter, Counter, HistogramDuration, MetricAttributes, MetricKeyValue, MetricParameters, - TemporalMeter, + CoreMeter, Counter, CounterBase, HistogramDuration, HistogramDurationBase, MetricAttributable, + MetricAttributes, MetricKeyValue, MetricParameters, TemporalMeter, }; -use tonic::{Code, body::BoxBody, transport::Channel}; +use tonic::{Code, body::Body, transport::Channel}; use tower::Service; /// The string name (which may be prefixed) for this metric @@ -26,22 +26,24 @@ pub(crate) struct MetricsContext { meter: Arc, kvs: MetricAttributes, poll_is_long: bool, + instruments: Instruments, +} +#[derive(Clone)] +struct Instruments { + svc_request: Counter, + svc_request_failed: Counter, + long_svc_request: Counter, + long_svc_request_failed: Counter, - svc_request: Arc, - svc_request_failed: Arc, - long_svc_request: Arc, - long_svc_request_failed: Arc, - - svc_request_latency: Arc, - long_svc_request_latency: Arc, + svc_request_latency: HistogramDuration, + long_svc_request_latency: HistogramDuration, } impl MetricsContext { pub(crate) fn new(tm: TemporalMeter) -> Self { let meter = tm.inner; - Self { - kvs: meter.new_attributes(tm.default_attribs), - poll_is_long: false, + let kvs = meter.new_attributes(tm.default_attribs); + let instruments = Instruments { svc_request: meter.counter(MetricParameters { name: "request".into(), description: "Count of client request successes by rpc name".into(), @@ -72,6 +74,11 @@ impl MetricsContext { unit: "duration".into(), description: "Histogram of client long-poll request latencies".into(), }), + }; + Self { + kvs, + poll_is_long: false, + instruments, meter, } } @@ -81,6 +88,33 @@ impl MetricsContext { self.kvs = self .meter .extend_attributes(self.kvs.clone(), new_kvs.into()); + + let _ = self + .instruments + .svc_request + .with_attributes(&self.kvs) + .and_then(|v| { + self.instruments.svc_request = v; + self.instruments.long_svc_request.with_attributes(&self.kvs) + }) + .and_then(|v| { + self.instruments.long_svc_request = v; + self.instruments + .svc_request_latency + .with_attributes(&self.kvs) + }) + .and_then(|v| { + self.instruments.svc_request_latency = v; + self.instruments + .long_svc_request_latency + .with_attributes(&self.kvs) + }) + .map(|v| { + self.instruments.long_svc_request_latency = v; + }) + .inspect_err(|e| { + dbg_panic!("Failed to extend client metrics attributes: {:?}", e); + }); } pub(crate) fn set_is_long_poll(&mut self) { @@ -90,9 +124,9 @@ impl MetricsContext { /// A request to the temporal service was made pub(crate) fn svc_request(&self) { if self.poll_is_long { - self.long_svc_request.add(1, &self.kvs); + self.instruments.long_svc_request.adds(1); } else { - self.svc_request.add(1, &self.kvs); + self.instruments.svc_request.adds(1); } } @@ -108,18 +142,18 @@ impl MetricsContext { &self.kvs }; if self.poll_is_long { - self.long_svc_request_failed.add(1, kvs); + self.instruments.long_svc_request_failed.add(1, kvs); } else { - self.svc_request_failed.add(1, kvs); + self.instruments.svc_request_failed.add(1, kvs); } } /// Record service request latency pub(crate) fn record_svc_req_latency(&self, dur: Duration) { if self.poll_is_long { - self.long_svc_request_latency.record(dur, &self.kvs); + self.instruments.long_svc_request_latency.records(dur); } else { - self.svc_request_latency.record(dur, &self.kvs); + self.instruments.svc_request_latency.records(dur); } } } @@ -177,8 +211,8 @@ pub struct GrpcMetricSvc { pub(crate) disable_errcode_label: bool, } -impl Service> for GrpcMetricSvc { - type Response = http::Response; +impl Service> for GrpcMetricSvc { + type Response = http::Response; type Error = tonic::transport::Error; type Future = BoxFuture<'static, Result>; @@ -186,7 +220,7 @@ impl Service> for GrpcMetricSvc { self.inner.poll_ready(cx).map_err(Into::into) } - fn call(&mut self, mut req: http::Request) -> Self::Future { + fn call(&mut self, mut req: http::Request) -> Self::Future { let metrics = self .metrics .clone() diff --git a/client/src/raw.rs b/client/src/raw.rs index d5bdb791f..e7aae76bf 100644 --- a/client/src/raw.rs +++ b/client/src/raw.rs @@ -24,7 +24,7 @@ use temporal_sdk_core_protos::{ }; use tonic::{ Request, Response, Status, - body::BoxBody, + body::Body, client::GrpcService, metadata::{AsciiMetadataValue, KeyAndValueRef}, }; @@ -166,7 +166,7 @@ where impl RawClientLike for TemporalServiceClient where T: Send + Sync + Clone + 'static, - T: GrpcService + Send + Clone + 'static, + T: GrpcService + Send + Clone + 'static, T::ResponseBody: tonic::codegen::Body + Send + 'static, T::Error: Into, ::Error: Into + Send, @@ -221,7 +221,7 @@ where impl RawClientLike for ConfiguredClient> where T: Send + Sync + Clone + 'static, - T: GrpcService + Send + Clone + 'static, + T: GrpcService + Send + Clone + 'static, T::ResponseBody: tonic::codegen::Body + Send + 'static, T::Error: Into, ::Error: Into + Send, @@ -373,7 +373,7 @@ pub(super) struct IsUserLongPoll; impl WorkflowService for RC where RC: RawClientLike, - T: GrpcService + Send + Clone + 'static, + T: GrpcService + Send + Clone + 'static, T::ResponseBody: tonic::codegen::Body + Send + 'static, T::Error: Into, T::Future: Send, @@ -383,7 +383,7 @@ where impl OperatorService for RC where RC: RawClientLike, - T: GrpcService + Send + Clone + 'static, + T: GrpcService + Send + Clone + 'static, T::ResponseBody: tonic::codegen::Body + Send + 'static, T::Error: Into, T::Future: Send, @@ -393,7 +393,7 @@ where impl CloudService for RC where RC: RawClientLike, - T: GrpcService + Send + Clone + 'static, + T: GrpcService + Send + Clone + 'static, T::ResponseBody: tonic::codegen::Body + Send + 'static, T::Error: Into, T::Future: Send, @@ -403,7 +403,7 @@ where impl TestService for RC where RC: RawClientLike, - T: GrpcService + Send + Clone + 'static, + T: GrpcService + Send + Clone + 'static, T::ResponseBody: tonic::codegen::Body + Send + 'static, T::Error: Into, T::Future: Send, @@ -413,7 +413,7 @@ where impl HealthService for RC where RC: RawClientLike, - T: GrpcService + Send + Clone + 'static, + T: GrpcService + Send + Clone + 'static, T::ResponseBody: tonic::codegen::Body + Send + 'static, T::Error: Into, T::Future: Send, @@ -483,13 +483,13 @@ macro_rules! proxier { pub trait $trait_name: RawClientLike where // Yo this is wild - ::SvcType: GrpcService + Send + Clone + 'static, - <::SvcType as GrpcService>::ResponseBody: + ::SvcType: GrpcService + Send + Clone + 'static, + <::SvcType as GrpcService>::ResponseBody: tonic::codegen::Body + Send + 'static, - <::SvcType as GrpcService>::Error: + <::SvcType as GrpcService>::Error: Into, - <::SvcType as GrpcService>::Future: Send, - <<::SvcType as GrpcService>::ResponseBody + <::SvcType as GrpcService>::Future: Send, + <<::SvcType as GrpcService>::ResponseBody as tonic::codegen::Body>::Error: Into + Send, { $( diff --git a/core-api/Cargo.toml b/core-api/Cargo.toml index 971803e40..c7eaeefa4 100644 --- a/core-api/Cargo.toml +++ b/core-api/Cargo.toml @@ -14,10 +14,9 @@ categories = ["development-tools"] [features] otel_impls = ["dep:opentelemetry"] -envconfig = ["dep:toml", "dep:serde", "dep:dirs", "dep:anyhow"] +envconfig = ["dep:toml", "dep:serde", "dep:dirs"] [dependencies] -anyhow = { version = "1.0", optional = true } async-trait = "0.1" dirs = { version = "5.0", optional = true } derive_builder = { workspace = true } @@ -29,6 +28,7 @@ serde_json = "1.0" thiserror = { workspace = true } toml = { version = "0.8", optional = true } tonic = { workspace = true } +tracing = "0.1" tracing-core = "0.1" url = "2.3" diff --git a/core-api/src/envconfig.rs b/core-api/src/envconfig.rs index b0b198c26..efaf5e81d 100644 --- a/core-api/src/envconfig.rs +++ b/core-api/src/envconfig.rs @@ -50,9 +50,7 @@ //! ``` use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::fs; -use std::path::Path; +use std::{collections::HashMap, fs, path::Path}; use thiserror::Error; /// Default profile name when none is specified @@ -71,7 +69,7 @@ pub enum ConfigError { InvalidConfig(String), #[error("Configuration loading error: {0}")] - LoadError(anyhow::Error), + LoadError(Box), } impl From for ConfigError { diff --git a/core-api/src/lib.rs b/core-api/src/lib.rs index 82a13cc1b..c9e3dd5d3 100644 --- a/core-api/src/lib.rs +++ b/core-api/src/lib.rs @@ -138,3 +138,12 @@ pub trait Worker: Send + Sync { /// functions have returned `ShutDown` errors. async fn finalize_shutdown(self); } + +macro_rules! dbg_panic { + ($($arg:tt)*) => { + use tracing::error; + error!($($arg)*); + debug_assert!(false, $($arg)*); + }; +} +pub(crate) use dbg_panic; diff --git a/core-api/src/telemetry/metrics.rs b/core-api/src/telemetry/metrics.rs index 59335e907..78cb89996 100644 --- a/core-api/src/telemetry/metrics.rs +++ b/core-api/src/telemetry/metrics.rs @@ -1,7 +1,9 @@ +use crate::dbg_panic; use std::{ any::Any, borrow::Cow, - fmt::Debug, + collections::{BTreeMap, HashMap}, + fmt::{Debug, Display}, ops::Deref, sync::{Arc, OnceLock}, time::Duration, @@ -23,16 +25,16 @@ pub trait CoreMeter: Send + Sync + Debug { existing: MetricAttributes, attribs: NewAttributes, ) -> MetricAttributes; - fn counter(&self, params: MetricParameters) -> Arc; - fn histogram(&self, params: MetricParameters) -> Arc; - fn histogram_f64(&self, params: MetricParameters) -> Arc; + fn counter(&self, params: MetricParameters) -> Counter; + fn histogram(&self, params: MetricParameters) -> Histogram; + fn histogram_f64(&self, params: MetricParameters) -> HistogramF64; /// Create a histogram which records Durations. Implementations should choose to emit in /// either milliseconds or seconds depending on how they have been configured. /// [MetricParameters::unit] should be overwritten by implementations to be `ms` or `s` /// accordingly. - fn histogram_duration(&self, params: MetricParameters) -> Arc; - fn gauge(&self, params: MetricParameters) -> Arc; - fn gauge_f64(&self, params: MetricParameters) -> Arc; + fn histogram_duration(&self, params: MetricParameters) -> HistogramDuration; + fn gauge(&self, params: MetricParameters) -> Gauge; + fn gauge_f64(&self, params: MetricParameters) -> GaugeF64; } #[derive(Debug, Clone, derive_builder::Builder)] @@ -84,26 +86,26 @@ impl CoreMeter for Arc { self.as_ref().extend_attributes(existing, attribs) } - fn counter(&self, params: MetricParameters) -> Arc { + fn counter(&self, params: MetricParameters) -> Counter { self.as_ref().counter(params) } - fn histogram(&self, params: MetricParameters) -> Arc { + fn histogram(&self, params: MetricParameters) -> Histogram { self.as_ref().histogram(params) } - fn histogram_f64(&self, params: MetricParameters) -> Arc { + fn histogram_f64(&self, params: MetricParameters) -> HistogramF64 { self.as_ref().histogram_f64(params) } - fn histogram_duration(&self, params: MetricParameters) -> Arc { + fn histogram_duration(&self, params: MetricParameters) -> HistogramDuration { self.as_ref().histogram_duration(params) } - fn gauge(&self, params: MetricParameters) -> Arc { + fn gauge(&self, params: MetricParameters) -> Gauge { self.as_ref().gauge(params) } - fn gauge_f64(&self, params: MetricParameters) -> Arc { + fn gauge_f64(&self, params: MetricParameters) -> GaugeF64 { self.as_ref().gauge_f64(params) } } @@ -117,8 +119,12 @@ pub enum MetricAttributes { OTel { kvs: Arc>, }, + Prometheus { + labels: Arc, + }, Buffer(BufferAttributes), Dynamic(Arc), + Empty, } /// A reference to some attributes created lang side. @@ -150,7 +156,7 @@ where } /// A K/V pair that can be used to label a specific recording of a metric -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct MetricKeyValue { pub key: String, pub value: MetricValue, @@ -165,7 +171,7 @@ impl MetricKeyValue { } /// Values metric labels may assume -#[derive(Clone, Debug, derive_more::From)] +#[derive(Clone, Debug, PartialEq, derive_more::From)] pub enum MetricValue { String(String), Int(i64), @@ -178,30 +184,338 @@ impl From<&'static str> for MetricValue { MetricValue::String(value.to_string()) } } +impl Display for MetricValue { + fn fmt(&self, f1: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + MetricValue::String(s) => write!(f1, "{}", s), + MetricValue::Int(i) => write!(f1, "{}", i), + MetricValue::Float(f) => write!(f1, "{}", f), + MetricValue::Bool(b) => write!(f1, "{}", b), + } + } +} + +pub trait MetricAttributable { + /// Replace any existing attributes on this metric with new ones, and return a new copy + /// of the metric, or a base version, which can be used to record values. + /// + /// Note that this operation is relatively expensive compared to simply recording a value + /// without any additional attributes, so users should prefer to save the metric instance + /// after calling this, and use the value-only methods afterward. + /// + /// This operation may fail if the underlying metrics implementation disallows the registration + /// of a new metric, or encounters any other issue. + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result>; +} + +#[derive(Clone)] +pub struct LazyBoundMetric { + metric: T, + attributes: MetricAttributes, + bound_cache: OnceLock, +} +impl LazyBoundMetric { + pub fn update_attributes(&mut self, new_attributes: MetricAttributes) { + self.attributes = new_attributes; + self.bound_cache = OnceLock::new(); + } +} -pub trait Counter: Send + Sync { - fn add(&self, value: u64, attributes: &MetricAttributes); +pub trait CounterBase: Send + Sync { + fn adds(&self, value: u64); +} +pub type Counter = LazyBoundMetric< + Arc> + Send + Sync>, + Arc, +>; +impl Counter { + pub fn new(inner: Arc> + Send + Sync>) -> Self { + Self { + metric: inner, + attributes: MetricAttributes::Empty, + bound_cache: OnceLock::new(), + } + } + pub fn add(&self, value: u64, attributes: &MetricAttributes) { + if let Ok(base) = self.metric.with_attributes(attributes) { + base.adds(value); + } + } +} +impl CounterBase for Counter { + fn adds(&self, value: u64) { + // TODO: Replace all of these with below when stable + // https://doc.rust-lang.org/std/sync/struct.OnceLock.html#method.get_or_try_init + let bound = self.bound_cache.get_or_init(|| { + self.metric + .with_attributes(&self.attributes) + .map(Into::into) + .unwrap_or_else(|e| { + dbg_panic!("Failed to initialize metric, will drop values: {:?}", e); + Arc::new(NoOpInstrument) as Arc + }) + }); + bound.adds(value); + } +} +impl MetricAttributable for Counter { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result> { + Ok(Self { + metric: self.metric.clone(), + attributes: attributes.clone(), + bound_cache: OnceLock::new(), + }) + } +} + +pub trait HistogramBase: Send + Sync { + fn records(&self, value: u64); +} +pub type Histogram = LazyBoundMetric< + Arc> + Send + Sync>, + Arc, +>; +impl Histogram { + pub fn new(inner: Arc> + Send + Sync>) -> Self { + Self { + metric: inner, + attributes: MetricAttributes::Empty, + bound_cache: OnceLock::new(), + } + } + pub fn record(&self, value: u64, attributes: &MetricAttributes) { + if let Ok(base) = self.metric.with_attributes(attributes) { + base.records(value); + } + } +} +impl HistogramBase for Histogram { + fn records(&self, value: u64) { + let bound = self.bound_cache.get_or_init(|| { + self.metric + .with_attributes(&self.attributes) + .map(Into::into) + .unwrap_or_else(|e| { + dbg_panic!("Failed to initialize metric, will drop values: {:?}", e); + Arc::new(NoOpInstrument) as Arc + }) + }); + bound.records(value); + } +} +impl MetricAttributable for Histogram { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result> { + Ok(Self { + metric: self.metric.clone(), + attributes: attributes.clone(), + bound_cache: OnceLock::new(), + }) + } +} + +pub trait HistogramF64Base: Send + Sync { + fn records(&self, value: f64); +} +pub type HistogramF64 = LazyBoundMetric< + Arc> + Send + Sync>, + Arc, +>; +impl HistogramF64 { + pub fn new( + inner: Arc> + Send + Sync>, + ) -> Self { + Self { + metric: inner, + attributes: MetricAttributes::Empty, + bound_cache: OnceLock::new(), + } + } + pub fn record(&self, value: f64, attributes: &MetricAttributes) { + if let Ok(base) = self.metric.with_attributes(attributes) { + base.records(value); + } + } +} +impl HistogramF64Base for HistogramF64 { + fn records(&self, value: f64) { + let bound = self.bound_cache.get_or_init(|| { + self.metric + .with_attributes(&self.attributes) + .map(Into::into) + .unwrap_or_else(|e| { + dbg_panic!("Failed to initialize metric, will drop values: {:?}", e); + Arc::new(NoOpInstrument) as Arc + }) + }); + bound.records(value); + } +} +impl MetricAttributable for HistogramF64 { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result> { + Ok(Self { + metric: self.metric.clone(), + attributes: attributes.clone(), + bound_cache: OnceLock::new(), + }) + } +} + +pub trait HistogramDurationBase: Send + Sync { + fn records(&self, value: Duration); +} +pub type HistogramDuration = LazyBoundMetric< + Arc> + Send + Sync>, + Arc, +>; +impl HistogramDuration { + pub fn new( + inner: Arc> + Send + Sync>, + ) -> Self { + Self { + metric: inner, + attributes: MetricAttributes::Empty, + bound_cache: OnceLock::new(), + } + } + pub fn record(&self, value: Duration, attributes: &MetricAttributes) { + if let Ok(base) = self.metric.with_attributes(attributes) { + base.records(value); + } + } +} +impl HistogramDurationBase for HistogramDuration { + fn records(&self, value: Duration) { + let bound = self.bound_cache.get_or_init(|| { + self.metric + .with_attributes(&self.attributes) + .map(Into::into) + .unwrap_or_else(|e| { + dbg_panic!("Failed to initialize metric, will drop values: {:?}", e); + Arc::new(NoOpInstrument) as Arc + }) + }); + bound.records(value); + } +} +impl MetricAttributable for HistogramDuration { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result> { + Ok(Self { + metric: self.metric.clone(), + attributes: attributes.clone(), + bound_cache: OnceLock::new(), + }) + } } -pub trait Histogram: Send + Sync { - // When referring to durations, this value is in millis - fn record(&self, value: u64, attributes: &MetricAttributes); +pub trait GaugeBase: Send + Sync { + fn records(&self, value: u64); +} +pub type Gauge = LazyBoundMetric< + Arc> + Send + Sync>, + Arc, +>; +impl Gauge { + pub fn new(inner: Arc> + Send + Sync>) -> Self { + Self { + metric: inner, + attributes: MetricAttributes::Empty, + bound_cache: OnceLock::new(), + } + } + pub fn record(&self, value: u64, attributes: &MetricAttributes) { + if let Ok(base) = self.metric.with_attributes(attributes) { + base.records(value); + } + } } -pub trait HistogramF64: Send + Sync { - // When referring to durations, this value is in seconds - fn record(&self, value: f64, attributes: &MetricAttributes); +impl GaugeBase for Gauge { + fn records(&self, value: u64) { + let bound = self.bound_cache.get_or_init(|| { + self.metric + .with_attributes(&self.attributes) + .map(Into::into) + .unwrap_or_else(|e| { + dbg_panic!("Failed to initialize metric, will drop values: {:?}", e); + Arc::new(NoOpInstrument) as Arc + }) + }); + bound.records(value); + } } -pub trait HistogramDuration: Send + Sync { - fn record(&self, value: Duration, attributes: &MetricAttributes); +impl MetricAttributable for Gauge { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result> { + Ok(Self { + metric: self.metric.clone(), + attributes: attributes.clone(), + bound_cache: OnceLock::new(), + }) + } } -pub trait Gauge: Send + Sync { - // When referring to durations, this value is in millis - fn record(&self, value: u64, attributes: &MetricAttributes); +pub trait GaugeF64Base: Send + Sync { + fn records(&self, value: f64); +} +pub type GaugeF64 = LazyBoundMetric< + Arc> + Send + Sync>, + Arc, +>; +impl GaugeF64 { + pub fn new(inner: Arc> + Send + Sync>) -> Self { + Self { + metric: inner, + attributes: MetricAttributes::Empty, + bound_cache: OnceLock::new(), + } + } + pub fn record(&self, value: f64, attributes: &MetricAttributes) { + if let Ok(base) = self.metric.with_attributes(attributes) { + base.records(value); + } + } +} +impl GaugeF64Base for GaugeF64 { + fn records(&self, value: f64) { + let bound = self.bound_cache.get_or_init(|| { + self.metric + .with_attributes(&self.attributes) + .map(Into::into) + .unwrap_or_else(|e| { + dbg_panic!("Failed to initialize metric, will drop values: {:?}", e); + Arc::new(NoOpInstrument) as Arc + }) + }); + bound.records(value); + } } -pub trait GaugeF64: Send + Sync { - // When referring to durations, this value is in seconds - fn record(&self, value: f64, attributes: &MetricAttributes); +impl MetricAttributable for GaugeF64 { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result> { + Ok(Self { + metric: self.metric.clone(), + attributes: attributes.clone(), + bound_cache: OnceLock::new(), + }) + } } #[derive(Debug, Clone)] @@ -297,50 +611,65 @@ impl CoreMeter for NoOpCoreMeter { existing } - fn counter(&self, _: MetricParameters) -> Arc { - Arc::new(NoOpInstrument) + fn counter(&self, _: MetricParameters) -> Counter { + Counter::new(Arc::new(NoOpInstrument)) } - fn histogram(&self, _: MetricParameters) -> Arc { - Arc::new(NoOpInstrument) + fn histogram(&self, _: MetricParameters) -> Histogram { + Histogram::new(Arc::new(NoOpInstrument)) } - fn histogram_f64(&self, _: MetricParameters) -> Arc { - Arc::new(NoOpInstrument) + fn histogram_f64(&self, _: MetricParameters) -> HistogramF64 { + HistogramF64::new(Arc::new(NoOpInstrument)) } - fn histogram_duration(&self, _: MetricParameters) -> Arc { - Arc::new(NoOpInstrument) + fn histogram_duration(&self, _: MetricParameters) -> HistogramDuration { + HistogramDuration::new(Arc::new(NoOpInstrument)) } - fn gauge(&self, _: MetricParameters) -> Arc { - Arc::new(NoOpInstrument) + fn gauge(&self, _: MetricParameters) -> Gauge { + Gauge::new(Arc::new(NoOpInstrument)) } - fn gauge_f64(&self, _: MetricParameters) -> Arc { - Arc::new(NoOpInstrument) + fn gauge_f64(&self, _: MetricParameters) -> GaugeF64 { + GaugeF64::new(Arc::new(NoOpInstrument)) } } -pub struct NoOpInstrument; -impl Counter for NoOpInstrument { - fn add(&self, _: u64, _: &MetricAttributes) {} -} -impl Histogram for NoOpInstrument { - fn record(&self, _: u64, _: &MetricAttributes) {} -} -impl HistogramF64 for NoOpInstrument { - fn record(&self, _: f64, _: &MetricAttributes) {} -} -impl HistogramDuration for NoOpInstrument { - fn record(&self, _: Duration, _: &MetricAttributes) {} -} -impl Gauge for NoOpInstrument { - fn record(&self, _: u64, _: &MetricAttributes) {} +macro_rules! impl_metric_attributable { + ($base_trait:ident, $rt:ty, $init:expr) => { + impl MetricAttributable> for $rt { + fn with_attributes( + &self, + _: &MetricAttributes, + ) -> Result, Box> { + Ok(Box::new($init)) + } + } + }; } -impl GaugeF64 for NoOpInstrument { - fn record(&self, _: f64, _: &MetricAttributes) {} + +pub struct NoOpInstrument; +macro_rules! impl_no_op { + ($base_trait:ident, $value_type:ty) => { + impl_metric_attributable!($base_trait, NoOpInstrument, NoOpInstrument); + impl $base_trait for NoOpInstrument { + fn records(&self, _: $value_type) {} + } + }; + ($base_trait:ident) => { + impl_metric_attributable!($base_trait, NoOpInstrument, NoOpInstrument); + impl $base_trait for NoOpInstrument { + fn adds(&self, _: u64) {} + } + }; } +impl_no_op!(CounterBase); +impl_no_op!(HistogramBase, u64); +impl_no_op!(HistogramF64Base, f64); +impl_no_op!(HistogramDurationBase, Duration); +impl_no_op!(GaugeBase, u64); +impl_no_op!(GaugeF64Base, f64); #[derive(Debug, Clone)] pub struct NoOpAttributes; @@ -355,6 +684,12 @@ mod otel_impls { use super::*; use opentelemetry::{KeyValue, metrics}; + #[derive(Clone)] + struct InstrumentWithAttributes { + inner: I, + attributes: MetricAttributes, + } + impl From for KeyValue { fn from(kv: MetricKeyValue) -> Self { KeyValue::new(kv.key, kv.value) @@ -372,68 +707,142 @@ mod otel_impls { } } - impl Counter for metrics::Counter { - fn add(&self, value: u64, attributes: &MetricAttributes) { - if let MetricAttributes::OTel { kvs } = attributes { - self.add(value, kvs); + impl MetricAttributable> for metrics::Counter { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + Ok(Box::new(InstrumentWithAttributes { + inner: self.clone(), + attributes: attributes.clone(), + })) + } + } + + impl CounterBase for InstrumentWithAttributes> { + fn adds(&self, value: u64) { + if let MetricAttributes::OTel { kvs } = &self.attributes { + self.inner.add(value, kvs); } else { - debug_assert!( - false, - "Must use OTel attributes with an OTel metric implementation" - ); + dbg_panic!("Must use OTel attributes with an OTel metric implementation"); } } } - impl Gauge for metrics::Gauge { - fn record(&self, value: u64, attributes: &MetricAttributes) { - if let MetricAttributes::OTel { kvs } = attributes { - self.record(value, kvs); + impl MetricAttributable> for metrics::Gauge { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + Ok(Box::new(InstrumentWithAttributes { + inner: self.clone(), + attributes: attributes.clone(), + })) + } + } + + impl GaugeBase for InstrumentWithAttributes> { + fn records(&self, value: u64) { + if let MetricAttributes::OTel { kvs } = &self.attributes { + self.inner.record(value, kvs); } else { - debug_assert!( - false, - "Must use OTel attributes with an OTel metric implementation" - ); + dbg_panic!("Must use OTel attributes with an OTel metric implementation"); } } } - impl GaugeF64 for metrics::Gauge { - fn record(&self, value: f64, attributes: &MetricAttributes) { - if let MetricAttributes::OTel { kvs } = attributes { - self.record(value, kvs); + impl MetricAttributable> for metrics::Gauge { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + Ok(Box::new(InstrumentWithAttributes { + inner: self.clone(), + attributes: attributes.clone(), + })) + } + } + + impl GaugeF64Base for InstrumentWithAttributes> { + fn records(&self, value: f64) { + if let MetricAttributes::OTel { kvs } = &self.attributes { + self.inner.record(value, kvs); } else { - debug_assert!( - false, - "Must use OTel attributes with an OTel metric implementation" - ); + dbg_panic!("Must use OTel attributes with an OTel metric implementation"); } } } - impl Histogram for metrics::Histogram { - fn record(&self, value: u64, attributes: &MetricAttributes) { - if let MetricAttributes::OTel { kvs } = attributes { - self.record(value, kvs); + impl MetricAttributable> for metrics::Histogram { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + Ok(Box::new(InstrumentWithAttributes { + inner: self.clone(), + attributes: attributes.clone(), + })) + } + } + + impl HistogramBase for InstrumentWithAttributes> { + fn records(&self, value: u64) { + if let MetricAttributes::OTel { kvs } = &self.attributes { + self.inner.record(value, kvs); } else { - debug_assert!( - false, - "Must use OTel attributes with an OTel metric implementation" - ); + dbg_panic!("Must use OTel attributes with an OTel metric implementation"); } } } - impl HistogramF64 for metrics::Histogram { - fn record(&self, value: f64, attributes: &MetricAttributes) { - if let MetricAttributes::OTel { kvs } = attributes { - self.record(value, kvs); + impl MetricAttributable> for metrics::Histogram { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + Ok(Box::new(InstrumentWithAttributes { + inner: self.clone(), + attributes: attributes.clone(), + })) + } + } + + impl HistogramF64Base for InstrumentWithAttributes> { + fn records(&self, value: f64) { + if let MetricAttributes::OTel { kvs } = &self.attributes { + self.inner.record(value, kvs); } else { - debug_assert!( - false, - "Must use OTel attributes with an OTel metric implementation" - ); + dbg_panic!("Must use OTel attributes with an OTel metric implementation"); } } } } + +/// Maintains a mapping of metric labels->values with a defined ordering, used for Prometheus labels +#[derive(Debug, Clone, PartialEq)] +pub struct OrderedMetricLabelSet { + pub attributes: BTreeMap, +} + +impl OrderedMetricLabelSet { + pub fn keys_ordered(&self) -> impl Iterator { + self.attributes.keys().map(|s| s.as_str()) + } + pub fn as_prom_labels(&self) -> HashMap<&str, String> { + let mut labels = HashMap::new(); + for (k, v) in self.attributes.iter() { + labels.insert(k.as_str(), v.to_string()); + } + labels + } +} + +impl From for OrderedMetricLabelSet { + fn from(n: NewAttributes) -> Self { + let mut attributes = BTreeMap::new(); + for kv in n.attributes { + attributes.insert(kv.key.clone(), kv.value); + } + Self { attributes } + } +} diff --git a/core/Cargo.toml b/core/Cargo.toml index 89ee57c6b..a7accd34c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -13,9 +13,10 @@ categories = ["development-tools"] [lib] [features] -default = ["otel"] -otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp", - "dep:opentelemetry-prometheus", "dep:hyper", "dep:hyper-util", "dep:http-body-util"] +default = ["otel", "prom"] +otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp", "dep:hyper", + "dep:hyper-util", "dep:http-body-util"] +prom = ["dep:prometheus"] tokio-console = ["console-subscriber"] ephemeral-server = ["dep:flate2", "dep:reqwest", "dep:tar", "dep:zip"] debug-plugin = ["dep:reqwest"] @@ -43,13 +44,12 @@ itertools = "0.14" lru = "0.13" mockall = "0.13" opentelemetry = { workspace = true, features = ["metrics"], optional = true } -opentelemetry_sdk = { version = "0.29", features = ["rt-tokio", "metrics", "spec_unstable_metrics_views"], optional = true } -opentelemetry-otlp = { version = "0.29", features = ["tokio", "metrics", "tls", "http-proto", "grpc-tonic"], optional = true } -opentelemetry-prometheus = { version = "0.29", optional = true } +opentelemetry_sdk = { version = "0.30", features = ["rt-tokio", "metrics", "spec_unstable_metrics_views"], optional = true } +opentelemetry-otlp = { version = "0.30", features = ["tokio", "metrics", "tls", "http-proto", "grpc-tonic"], optional = true } parking_lot = { version = "0.12", features = ["send_guard"] } pid = "4.0" pin-project = "1.0" -prometheus = "0.14" +prometheus = { version = "0.14", optional = true } prost = { workspace = true } prost-types = { version = "0.6", package = "prost-wkt-types" } rand = "0.9" @@ -65,7 +65,7 @@ thiserror = { workspace = true } tokio = { version = "1.37", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs", "process"] } tokio-util = { version = "0.7", features = ["io", "io-util"] } tokio-stream = "0.1" -tonic = { workspace = true, features = ["tls", "tls-roots"] } +tonic = { workspace = true, features = ["tls-ring", "tls-native-roots"] } tracing = "0.1" tracing-subscriber = { version = "0.3", default-features = false, features = ["parking_lot", "env-filter", "registry", "ansi"] } url = "2.2" @@ -91,7 +91,7 @@ path = "../fsm" assert_matches = "1.4" bimap = "0.6.1" clap = { version = "4.0", features = ["derive"] } -criterion = "0.5" +criterion = { version = "0.6", features = ["async", "async_tokio"] } rstest = "0.25" temporal-sdk-core-test-utils = { path = "../test-utils" } temporal-sdk = { path = "../sdk" } diff --git a/core/benches/workflow_replay.rs b/core/benches/workflow_replay.rs index 244d73e17..082190846 100644 --- a/core/benches/workflow_replay.rs +++ b/core/benches/workflow_replay.rs @@ -1,10 +1,19 @@ -use criterion::{Criterion, criterion_group, criterion_main}; +use criterion::{BatchSize, Criterion, criterion_group, criterion_main}; use futures_util::StreamExt; -use std::time::Duration; +use std::{ + sync::{Arc, mpsc}, + thread, + time::Duration, +}; use temporal_sdk::{WfContext, WorkflowFunction}; -use temporal_sdk_core::replay::HistoryForReplay; +use temporal_sdk_core::{CoreRuntime, replay::HistoryForReplay}; +use temporal_sdk_core_api::telemetry::metrics::{ + MetricKeyValue, MetricParametersBuilder, NewAttributes, +}; use temporal_sdk_core_protos::DEFAULT_WORKFLOW_TYPE; -use temporal_sdk_core_test_utils::{canned_histories, replay_sdk_worker}; +use temporal_sdk_core_test_utils::{ + DONT_AUTO_INIT_INTEG_TELEM, canned_histories, prom_metrics, replay_sdk_worker, +}; pub fn criterion_benchmark(c: &mut Criterion) { let tokio_runtime = tokio::runtime::Builder::new_current_thread() @@ -12,6 +21,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { .build() .unwrap(); let _g = tokio_runtime.enter(); + DONT_AUTO_INIT_INTEG_TELEM.set(true); let num_timers = 10; let t = canned_histories::long_sequential_timers(num_timers as usize); @@ -21,14 +31,17 @@ pub fn criterion_benchmark(c: &mut Criterion) { ); c.bench_function("Small history replay", |b| { - b.iter(|| { - tokio_runtime.block_on(async { + b.to_async(&tokio_runtime).iter_batched( + || { let func = timers_wf(num_timers); - let mut worker = replay_sdk_worker([hist.clone()]); + (func, replay_sdk_worker([hist.clone()])) + }, + |(func, mut worker)| async move { worker.register_wf(DEFAULT_WORKFLOW_TYPE, func); worker.run().await.unwrap(); - }) - }) + }, + BatchSize::SmallInput, + ) }); let num_tasks = 50; @@ -39,18 +52,104 @@ pub fn criterion_benchmark(c: &mut Criterion) { ); c.bench_function("Large payloads history replay", |b| { - b.iter(|| { - tokio_runtime.block_on(async { + b.to_async(&tokio_runtime).iter_batched( + || { let func = big_signals_wf(num_tasks); - let mut worker = replay_sdk_worker([hist.clone()]); + (func, replay_sdk_worker([hist.clone()])) + }, + |(func, mut worker)| async move { worker.register_wf(DEFAULT_WORKFLOW_TYPE, func); worker.run().await.unwrap(); - }) - }) + }, + BatchSize::SmallInput, + ) }); } -criterion_group!(benches, criterion_benchmark); +pub fn bench_metrics(c: &mut Criterion) { + DONT_AUTO_INIT_INTEG_TELEM.set(true); + let tokio_runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let _tokio = tokio_runtime.enter(); + let (mut telemopts, addr, _aborter) = prom_metrics(None); + telemopts.logging = None; + let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let meter = rt.telemetry().get_metric_meter().unwrap(); + + c.bench_function("Record with new attributes on each call", move |b| { + b.iter_batched( + || { + let c = meter.counter( + MetricParametersBuilder::default() + .name("c") + .build() + .unwrap(), + ); + let h = meter.histogram( + MetricParametersBuilder::default() + .name("h") + .build() + .unwrap(), + ); + let g = meter.gauge( + MetricParametersBuilder::default() + .name("g") + .build() + .unwrap(), + ); + + let vals = [1, 2, 3, 4, 5]; + let labels = ["l1", "l2"]; + + let (start_tx, start_rx) = mpsc::channel(); + let start_rx = Arc::new(std::sync::Mutex::new(start_rx)); + + let mut thread_handles = Vec::new(); + for _ in 0..3 { + let c = c.clone(); + let h = h.clone(); + let g = g.clone(); + let meter = meter.clone(); + let start_rx = start_rx.clone(); + + let handle = thread::spawn(move || { + // Wait for start signal + let _ = start_rx.lock().unwrap().recv(); + + for _ in 1..=100 { + for &val in &vals { + for &label in &labels { + let attribs = meter.new_attributes(NewAttributes::from(vec![ + MetricKeyValue::new("label", label), + ])); + c.add(val, &attribs); + h.record(val, &attribs); + g.record(val, &attribs); + } + } + } + }); + thread_handles.push(handle); + } + + (start_tx, thread_handles) + }, + |(start_tx, thread_handles)| { + for _ in 0..3 { + let _ = start_tx.send(()); + } + for handle in thread_handles { + let _ = handle.join(); + } + }, + BatchSize::SmallInput, + ) + }); +} + +criterion_group!(benches, criterion_benchmark, bench_metrics); criterion_main!(benches); fn timers_wf(num_timers: u32) -> WorkflowFunction { diff --git a/core/src/telemetry/metrics.rs b/core/src/telemetry/metrics.rs index 95c0b25e4..a51220b44 100644 --- a/core/src/telemetry/metrics.rs +++ b/core/src/telemetry/metrics.rs @@ -7,18 +7,17 @@ use std::{ time::Duration, }; use temporal_sdk_core_api::telemetry::metrics::{ - BufferAttributes, BufferInstrumentRef, CoreMeter, Counter, Gauge, GaugeF64, Histogram, - HistogramDuration, HistogramF64, LazyBufferInstrument, MetricAttributes, MetricCallBufferer, - MetricEvent, MetricKeyValue, MetricKind, MetricParameters, MetricUpdateVal, NewAttributes, - NoOpCoreMeter, + BufferAttributes, BufferInstrumentRef, CoreMeter, Counter, CounterBase, Gauge, GaugeBase, + GaugeF64, GaugeF64Base, Histogram, HistogramBase, HistogramDuration, HistogramDurationBase, + HistogramF64, HistogramF64Base, LazyBufferInstrument, MetricAttributable, MetricAttributes, + MetricCallBufferer, MetricEvent, MetricKeyValue, MetricKind, MetricParameters, MetricUpdateVal, + NewAttributes, NoOpCoreMeter, +}; +use temporal_sdk_core_protos::temporal::api::{ + enums::v1::WorkflowTaskFailedCause, failure::v1::Failure, }; -use temporal_sdk_core_protos::temporal::api::enums::v1::WorkflowTaskFailedCause; -use temporal_sdk_core_protos::temporal::api::failure::v1::Failure; /// Used to track context associated with metrics, and record/update them -/// -/// Possible improvement: make generic over some type tag so that methods are only exposed if the -/// appropriate k/vs have already been set. #[derive(Clone)] pub(crate) struct MetricsContext { meter: Arc, @@ -26,50 +25,53 @@ pub(crate) struct MetricsContext { instruments: Arc, } +#[derive(Clone)] struct Instruments { - wf_completed_counter: Arc, - wf_canceled_counter: Arc, - wf_failed_counter: Arc, - wf_cont_counter: Arc, - wf_e2e_latency: Arc, - wf_task_queue_poll_empty_counter: Arc, - wf_task_queue_poll_succeed_counter: Arc, - wf_task_execution_failure_counter: Arc, - wf_task_sched_to_start_latency: Arc, - wf_task_replay_latency: Arc, - wf_task_execution_latency: Arc, - act_poll_no_task: Arc, - act_task_received_counter: Arc, - act_execution_failed: Arc, - act_sched_to_start_latency: Arc, - act_exec_latency: Arc, - act_exec_succeeded_latency: Arc, - la_execution_cancelled: Arc, - la_execution_failed: Arc, - la_exec_latency: Arc, - la_exec_succeeded_latency: Arc, - la_total: Arc, - nexus_poll_no_task: Arc, - nexus_task_schedule_to_start_latency: Arc, - nexus_task_e2e_latency: Arc, - nexus_task_execution_latency: Arc, - nexus_task_execution_failed: Arc, - worker_registered: Arc, - num_pollers: Arc, - task_slots_available: Arc, - task_slots_used: Arc, - sticky_cache_hit: Arc, - sticky_cache_miss: Arc, - sticky_cache_size: Arc, - sticky_cache_forced_evictions: Arc, + wf_completed_counter: Counter, + wf_canceled_counter: Counter, + wf_failed_counter: Counter, + wf_cont_counter: Counter, + wf_e2e_latency: HistogramDuration, + wf_task_queue_poll_empty_counter: Counter, + wf_task_queue_poll_succeed_counter: Counter, + wf_task_execution_failure_counter: Counter, + wf_task_sched_to_start_latency: HistogramDuration, + wf_task_replay_latency: HistogramDuration, + wf_task_execution_latency: HistogramDuration, + act_poll_no_task: Counter, + act_task_received_counter: Counter, + act_execution_failed: Counter, + act_sched_to_start_latency: HistogramDuration, + act_exec_latency: HistogramDuration, + act_exec_succeeded_latency: HistogramDuration, + la_execution_cancelled: Counter, + la_execution_failed: Counter, + la_exec_latency: HistogramDuration, + la_exec_succeeded_latency: HistogramDuration, + la_total: Counter, + nexus_poll_no_task: Counter, + nexus_task_schedule_to_start_latency: HistogramDuration, + nexus_task_e2e_latency: HistogramDuration, + nexus_task_execution_latency: HistogramDuration, + nexus_task_execution_failed: Counter, + worker_registered: Counter, + num_pollers: Gauge, + task_slots_available: Gauge, + task_slots_used: Gauge, + sticky_cache_hit: Counter, + sticky_cache_miss: Counter, + sticky_cache_size: Gauge, + sticky_cache_forced_evictions: Counter, } impl MetricsContext { pub(crate) fn no_op() -> Self { let meter = Arc::new(NoOpCoreMeter); + let kvs = meter.new_attributes(Default::default()); + let instruments = Arc::new(Instruments::new(meter.as_ref())); Self { - kvs: meter.new_attributes(Default::default()), - instruments: Arc::new(Instruments::new(meter.as_ref())), + kvs, + instruments, meter, } } @@ -82,9 +84,11 @@ impl MetricsContext { .push(MetricKeyValue::new(KEY_NAMESPACE, namespace)); meter.default_attribs.attributes.push(task_queue(tq)); let kvs = meter.inner.new_attributes(meter.default_attribs); + let mut instruments = Instruments::new(meter.inner.as_ref()); + instruments.update_attributes(&kvs); Self { kvs, - instruments: Arc::new(Instruments::new(meter.inner.as_ref())), + instruments: Arc::new(instruments), meter: meter.inner, } } else { @@ -100,212 +104,186 @@ impl MetricsContext { let kvs = self .meter .extend_attributes(self.kvs.clone(), new_attrs.into()); + let mut instruments = (*self.instruments).clone(); + instruments.update_attributes(&kvs); Self { + instruments: Arc::new(instruments), kvs, - instruments: self.instruments.clone(), meter: self.meter.clone(), } } /// A workflow task queue poll succeeded pub(crate) fn wf_tq_poll_ok(&self) { - self.instruments - .wf_task_queue_poll_succeed_counter - .add(1, &self.kvs); + self.instruments.wf_task_queue_poll_succeed_counter.adds(1); } /// A workflow task queue poll timed out / had empty response pub(crate) fn wf_tq_poll_empty(&self) { - self.instruments - .wf_task_queue_poll_empty_counter - .add(1, &self.kvs); + self.instruments.wf_task_queue_poll_empty_counter.adds(1); } /// A workflow task execution failed pub(crate) fn wf_task_failed(&self) { - self.instruments - .wf_task_execution_failure_counter - .add(1, &self.kvs); + self.instruments.wf_task_execution_failure_counter.adds(1); } /// A workflow completed successfully pub(crate) fn wf_completed(&self) { - self.instruments.wf_completed_counter.add(1, &self.kvs); + self.instruments.wf_completed_counter.adds(1); } /// A workflow ended cancelled pub(crate) fn wf_canceled(&self) { - self.instruments.wf_canceled_counter.add(1, &self.kvs); + self.instruments.wf_canceled_counter.adds(1); } /// A workflow ended failed pub(crate) fn wf_failed(&self) { - self.instruments.wf_failed_counter.add(1, &self.kvs); + self.instruments.wf_failed_counter.adds(1); } /// A workflow continued as new pub(crate) fn wf_continued_as_new(&self) { - self.instruments.wf_cont_counter.add(1, &self.kvs); + self.instruments.wf_cont_counter.adds(1); } /// Record workflow total execution time in milliseconds pub(crate) fn wf_e2e_latency(&self, dur: Duration) { - self.instruments.wf_e2e_latency.record(dur, &self.kvs); + self.instruments.wf_e2e_latency.records(dur); } /// Record workflow task schedule to start time in millis pub(crate) fn wf_task_sched_to_start_latency(&self, dur: Duration) { - self.instruments - .wf_task_sched_to_start_latency - .record(dur, &self.kvs); + self.instruments.wf_task_sched_to_start_latency.records(dur); } /// Record workflow task execution time in milliseconds pub(crate) fn wf_task_latency(&self, dur: Duration) { - self.instruments - .wf_task_execution_latency - .record(dur, &self.kvs); + self.instruments.wf_task_execution_latency.records(dur); } /// Record time it takes to catch up on replaying a WFT pub(crate) fn wf_task_replay_latency(&self, dur: Duration) { - self.instruments - .wf_task_replay_latency - .record(dur, &self.kvs); + self.instruments.wf_task_replay_latency.records(dur); } /// An activity long poll timed out pub(crate) fn act_poll_timeout(&self) { - self.instruments.act_poll_no_task.add(1, &self.kvs); + self.instruments.act_poll_no_task.adds(1); } /// A count of activity tasks received pub(crate) fn act_task_received(&self) { - self.instruments.act_task_received_counter.add(1, &self.kvs); + self.instruments.act_task_received_counter.adds(1); } /// An activity execution failed pub(crate) fn act_execution_failed(&self) { - self.instruments.act_execution_failed.add(1, &self.kvs); + self.instruments.act_execution_failed.adds(1); } /// Record end-to-end (sched-to-complete) time for successful activity executions pub(crate) fn act_execution_succeeded(&self, dur: Duration) { - self.instruments - .act_exec_succeeded_latency - .record(dur, &self.kvs); + self.instruments.act_exec_succeeded_latency.records(dur); } /// Record activity task schedule to start time in millis pub(crate) fn act_sched_to_start_latency(&self, dur: Duration) { - self.instruments - .act_sched_to_start_latency - .record(dur, &self.kvs); + self.instruments.act_sched_to_start_latency.records(dur); } /// Record time it took to complete activity execution, from the time core generated the /// activity task, to the time lang responded with a completion (failure or success). pub(crate) fn act_execution_latency(&self, dur: Duration) { - self.instruments.act_exec_latency.record(dur, &self.kvs); + self.instruments.act_exec_latency.records(dur); } pub(crate) fn la_execution_cancelled(&self) { - self.instruments.la_execution_cancelled.add(1, &self.kvs); + self.instruments.la_execution_cancelled.adds(1); } pub(crate) fn la_execution_failed(&self) { - self.instruments.la_execution_failed.add(1, &self.kvs); + self.instruments.la_execution_failed.adds(1); } pub(crate) fn la_exec_latency(&self, dur: Duration) { - self.instruments.la_exec_latency.record(dur, &self.kvs); + self.instruments.la_exec_latency.records(dur); } pub(crate) fn la_exec_succeeded_latency(&self, dur: Duration) { - self.instruments - .la_exec_succeeded_latency - .record(dur, &self.kvs); + self.instruments.la_exec_succeeded_latency.records(dur); } pub(crate) fn la_executed(&self) { - self.instruments.la_total.add(1, &self.kvs); + self.instruments.la_total.adds(1); } /// A nexus long poll timed out pub(crate) fn nexus_poll_timeout(&self) { - self.instruments.nexus_poll_no_task.add(1, &self.kvs); + self.instruments.nexus_poll_no_task.adds(1); } /// Record nexus task schedule to start time pub(crate) fn nexus_task_sched_to_start_latency(&self, dur: Duration) { self.instruments .nexus_task_schedule_to_start_latency - .record(dur, &self.kvs); + .records(dur); } /// Record nexus task end-to-end time pub(crate) fn nexus_task_e2e_latency(&self, dur: Duration) { - self.instruments - .nexus_task_e2e_latency - .record(dur, &self.kvs); + self.instruments.nexus_task_e2e_latency.records(dur); } /// Record nexus task execution time pub(crate) fn nexus_task_execution_latency(&self, dur: Duration) { - self.instruments - .nexus_task_execution_latency - .record(dur, &self.kvs); + self.instruments.nexus_task_execution_latency.records(dur); } /// Record a nexus task execution failure pub(crate) fn nexus_task_execution_failed(&self) { - self.instruments - .nexus_task_execution_failed - .add(1, &self.kvs); + self.instruments.nexus_task_execution_failed.adds(1); } /// A worker was registered pub(crate) fn worker_registered(&self) { - self.instruments.worker_registered.add(1, &self.kvs); + self.instruments.worker_registered.adds(1); } /// Record current number of available task slots. Context should have worker type set. pub(crate) fn available_task_slots(&self, num: usize) { - self.instruments - .task_slots_available - .record(num as u64, &self.kvs) + self.instruments.task_slots_available.records(num as u64) } /// Record current number of used task slots. Context should have worker type set. pub(crate) fn task_slots_used(&self, num: u64) { - self.instruments.task_slots_used.record(num, &self.kvs) + self.instruments.task_slots_used.records(num) } /// Record current number of pollers. Context should include poller type / task queue tag. pub(crate) fn record_num_pollers(&self, num: usize) { - self.instruments.num_pollers.record(num as u64, &self.kvs); + self.instruments.num_pollers.records(num as u64); } /// A workflow task found a cached workflow to run against pub(crate) fn sticky_cache_hit(&self) { - self.instruments.sticky_cache_hit.add(1, &self.kvs); + self.instruments.sticky_cache_hit.adds(1); } /// A workflow task did not find a cached workflow pub(crate) fn sticky_cache_miss(&self) { - self.instruments.sticky_cache_miss.add(1, &self.kvs); + self.instruments.sticky_cache_miss.adds(1); } /// Record current cache size (in number of wfs, not bytes) pub(crate) fn cache_size(&self, size: u64) { - self.instruments.sticky_cache_size.record(size, &self.kvs); + self.instruments.sticky_cache_size.records(size); } /// Count a workflow being evicted from the cache pub(crate) fn forced_cache_eviction(&self) { - self.instruments - .sticky_cache_forced_evictions - .add(1, &self.kvs); + self.instruments.sticky_cache_forced_evictions.adds(1); } } @@ -496,6 +474,77 @@ impl Instruments { }), } } + + fn update_attributes(&mut self, new_attributes: &MetricAttributes) { + self.wf_completed_counter + .update_attributes(new_attributes.clone()); + self.wf_canceled_counter + .update_attributes(new_attributes.clone()); + self.wf_failed_counter + .update_attributes(new_attributes.clone()); + self.wf_cont_counter + .update_attributes(new_attributes.clone()); + self.wf_e2e_latency + .update_attributes(new_attributes.clone()); + self.wf_task_queue_poll_empty_counter + .update_attributes(new_attributes.clone()); + self.wf_task_queue_poll_succeed_counter + .update_attributes(new_attributes.clone()); + self.wf_task_execution_failure_counter + .update_attributes(new_attributes.clone()); + self.wf_task_sched_to_start_latency + .update_attributes(new_attributes.clone()); + self.wf_task_replay_latency + .update_attributes(new_attributes.clone()); + self.wf_task_execution_latency + .update_attributes(new_attributes.clone()); + self.act_poll_no_task + .update_attributes(new_attributes.clone()); + self.act_task_received_counter + .update_attributes(new_attributes.clone()); + self.act_execution_failed + .update_attributes(new_attributes.clone()); + self.act_sched_to_start_latency + .update_attributes(new_attributes.clone()); + self.act_exec_latency + .update_attributes(new_attributes.clone()); + self.act_exec_succeeded_latency + .update_attributes(new_attributes.clone()); + self.la_execution_cancelled + .update_attributes(new_attributes.clone()); + self.la_execution_failed + .update_attributes(new_attributes.clone()); + self.la_exec_latency + .update_attributes(new_attributes.clone()); + self.la_exec_succeeded_latency + .update_attributes(new_attributes.clone()); + self.la_total.update_attributes(new_attributes.clone()); + self.nexus_poll_no_task + .update_attributes(new_attributes.clone()); + self.nexus_task_schedule_to_start_latency + .update_attributes(new_attributes.clone()); + self.nexus_task_e2e_latency + .update_attributes(new_attributes.clone()); + self.nexus_task_execution_latency + .update_attributes(new_attributes.clone()); + self.nexus_task_execution_failed + .update_attributes(new_attributes.clone()); + self.worker_registered + .update_attributes(new_attributes.clone()); + self.num_pollers.update_attributes(new_attributes.clone()); + self.task_slots_available + .update_attributes(new_attributes.clone()); + self.task_slots_used + .update_attributes(new_attributes.clone()); + self.sticky_cache_hit + .update_attributes(new_attributes.clone()); + self.sticky_cache_miss + .update_attributes(new_attributes.clone()); + self.sticky_cache_size + .update_attributes(new_attributes.clone()); + self.sticky_cache_forced_evictions + .update_attributes(new_attributes.clone()); + } } const KEY_NAMESPACE: &str = "namespace"; @@ -756,28 +805,30 @@ where } } - fn counter(&self, params: MetricParameters) -> Arc { - Arc::new(self.new_instrument(params, MetricKind::Counter)) + fn counter(&self, params: MetricParameters) -> Counter { + Counter::new(Arc::new(self.new_instrument(params, MetricKind::Counter))) } - fn histogram(&self, params: MetricParameters) -> Arc { - Arc::new(self.new_instrument(params, MetricKind::Histogram)) + fn histogram(&self, params: MetricParameters) -> Histogram { + Histogram::new(Arc::new(self.new_instrument(params, MetricKind::Histogram))) } - fn histogram_f64(&self, params: MetricParameters) -> Arc { - Arc::new(self.new_instrument(params, MetricKind::HistogramF64)) + fn histogram_f64(&self, params: MetricParameters) -> HistogramF64 { + HistogramF64::new(Arc::new(self.new_instrument(params, MetricKind::Histogram))) } - fn histogram_duration(&self, params: MetricParameters) -> Arc { - Arc::new(self.new_instrument(params, MetricKind::HistogramDuration)) + fn histogram_duration(&self, params: MetricParameters) -> HistogramDuration { + HistogramDuration::new(Arc::new( + self.new_instrument(params, MetricKind::HistogramDuration), + )) } - fn gauge(&self, params: MetricParameters) -> Arc { - Arc::new(self.new_instrument(params, MetricKind::Gauge)) + fn gauge(&self, params: MetricParameters) -> Gauge { + Gauge::new(Arc::new(self.new_instrument(params, MetricKind::Gauge))) } - fn gauge_f64(&self, params: MetricParameters) -> Arc { - Arc::new(self.new_instrument(params, MetricKind::GaugeF64)) + fn gauge_f64(&self, params: MetricParameters) -> GaugeF64 { + GaugeF64::new(Arc::new(self.new_instrument(params, MetricKind::Gauge))) } } impl MetricCallBufferer for MetricsCallBuffer @@ -789,6 +840,7 @@ where } } +#[derive(Clone)] struct BufferInstrument { instrument_ref: LazyBufferInstrument, tx: LogErrOnFullSender>, @@ -800,7 +852,7 @@ where fn send(&self, value: MetricUpdateVal, attributes: &MetricAttributes) { let attributes = match attributes { MetricAttributes::Buffer(l) => l.clone(), - _ => panic!("MetricsCallBuffer only works with MetricAttributes::Lang"), + _ => panic!("MetricsCallBuffer only works with MetricAttributes::Buffer"), }; self.tx.send(MetricEvent::Update { instrument: self.instrument_ref.clone(), @@ -809,52 +861,154 @@ where }); } } -impl Counter for BufferInstrument + +#[derive(Clone)] +struct InstrumentWithAttributes { + inner: I, + attributes: MetricAttributes, +} + +impl MetricAttributable> for BufferInstrument +where + I: BufferInstrumentRef + Send + Sync + Clone + 'static, +{ + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + Ok(Box::new(InstrumentWithAttributes { + inner: self.clone(), + attributes: attributes.clone(), + })) + } +} + +impl MetricAttributable> for BufferInstrument +where + I: BufferInstrumentRef + Send + Sync + Clone + 'static, +{ + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + Ok(Box::new(InstrumentWithAttributes { + inner: self.clone(), + attributes: attributes.clone(), + })) + } +} + +impl MetricAttributable> for BufferInstrument +where + I: BufferInstrumentRef + Send + Sync + Clone + 'static, +{ + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + Ok(Box::new(InstrumentWithAttributes { + inner: self.clone(), + attributes: attributes.clone(), + })) + } +} + +impl MetricAttributable> for BufferInstrument +where + I: BufferInstrumentRef + Send + Sync + Clone + 'static, +{ + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + Ok(Box::new(InstrumentWithAttributes { + inner: self.clone(), + attributes: attributes.clone(), + })) + } +} + +impl MetricAttributable> for BufferInstrument +where + I: BufferInstrumentRef + Send + Sync + Clone + 'static, +{ + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + Ok(Box::new(InstrumentWithAttributes { + inner: self.clone(), + attributes: attributes.clone(), + })) + } +} + +impl MetricAttributable> for BufferInstrument +where + I: BufferInstrumentRef + Send + Sync + Clone + 'static, +{ + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + Ok(Box::new(InstrumentWithAttributes { + inner: self.clone(), + attributes: attributes.clone(), + })) + } +} +impl CounterBase for InstrumentWithAttributes> where - I: BufferInstrumentRef + Send + Sync + Clone, + I: BufferInstrumentRef + Send + Sync + Clone + 'static, { - fn add(&self, value: u64, attributes: &MetricAttributes) { - self.send(MetricUpdateVal::Delta(value), attributes) + fn adds(&self, value: u64) { + self.inner + .send(MetricUpdateVal::Delta(value), &self.attributes) } } -impl Gauge for BufferInstrument +impl GaugeBase for InstrumentWithAttributes> where - I: BufferInstrumentRef + Send + Sync + Clone, + I: BufferInstrumentRef + Send + Sync + Clone + 'static, { - fn record(&self, value: u64, attributes: &MetricAttributes) { - self.send(MetricUpdateVal::Value(value), attributes) + fn records(&self, value: u64) { + self.inner + .send(MetricUpdateVal::Value(value), &self.attributes) } } -impl GaugeF64 for BufferInstrument +impl GaugeF64Base for InstrumentWithAttributes> where - I: BufferInstrumentRef + Send + Sync + Clone, + I: BufferInstrumentRef + Send + Sync + Clone + 'static, { - fn record(&self, value: f64, attributes: &MetricAttributes) { - self.send(MetricUpdateVal::ValueF64(value), attributes) + fn records(&self, value: f64) { + self.inner + .send(MetricUpdateVal::ValueF64(value), &self.attributes) } } -impl Histogram for BufferInstrument +impl HistogramBase for InstrumentWithAttributes> where - I: BufferInstrumentRef + Send + Sync + Clone, + I: BufferInstrumentRef + Send + Sync + Clone + 'static, { - fn record(&self, value: u64, attributes: &MetricAttributes) { - self.send(MetricUpdateVal::Value(value), attributes) + fn records(&self, value: u64) { + self.inner + .send(MetricUpdateVal::Value(value), &self.attributes) } } -impl HistogramF64 for BufferInstrument +impl HistogramF64Base for InstrumentWithAttributes> where - I: BufferInstrumentRef + Send + Sync + Clone, + I: BufferInstrumentRef + Send + Sync + Clone + 'static, { - fn record(&self, value: f64, attributes: &MetricAttributes) { - self.send(MetricUpdateVal::ValueF64(value), attributes) + fn records(&self, value: f64) { + self.inner + .send(MetricUpdateVal::ValueF64(value), &self.attributes) } } -impl HistogramDuration for BufferInstrument +impl HistogramDurationBase for InstrumentWithAttributes> where - I: BufferInstrumentRef + Send + Sync + Clone, + I: BufferInstrumentRef + Send + Sync + Clone + 'static, { - fn record(&self, value: Duration, attributes: &MetricAttributes) { - self.send(MetricUpdateVal::Duration(value), attributes) + fn records(&self, value: Duration) { + self.inner + .send(MetricUpdateVal::Duration(value), &self.attributes) } } @@ -876,32 +1030,32 @@ impl CoreMeter for PrefixedMetricsMeter { self.meter.extend_attributes(existing, attribs) } - fn counter(&self, mut params: MetricParameters) -> Arc { + fn counter(&self, mut params: MetricParameters) -> Counter { params.name = (self.prefix.clone() + &*params.name).into(); self.meter.counter(params) } - fn histogram(&self, mut params: MetricParameters) -> Arc { + fn histogram(&self, mut params: MetricParameters) -> Histogram { params.name = (self.prefix.clone() + &*params.name).into(); self.meter.histogram(params) } - fn histogram_f64(&self, mut params: MetricParameters) -> Arc { + fn histogram_f64(&self, mut params: MetricParameters) -> HistogramF64 { params.name = (self.prefix.clone() + &*params.name).into(); self.meter.histogram_f64(params) } - fn histogram_duration(&self, mut params: MetricParameters) -> Arc { + fn histogram_duration(&self, mut params: MetricParameters) -> HistogramDuration { params.name = (self.prefix.clone() + &*params.name).into(); self.meter.histogram_duration(params) } - fn gauge(&self, mut params: MetricParameters) -> Arc { + fn gauge(&self, mut params: MetricParameters) -> Gauge { params.name = (self.prefix.clone() + &*params.name).into(); self.meter.gauge(params) } - fn gauge_f64(&self, mut params: MetricParameters) -> Arc { + fn gauge_f64(&self, mut params: MetricParameters) -> GaugeF64 { params.name = (self.prefix.clone() + &*params.name).into(); self.meter.gauge_f64(params) } @@ -1002,6 +1156,7 @@ mod tests { => populate_into ); a2.set(Arc::new(DummyCustomAttrs(2))).unwrap(); + dbg!(&events); assert_matches!( &events[1], MetricEvent::Update { diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index e4ad7c394..57be571ea 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -5,18 +5,25 @@ mod log_export; pub(crate) mod metrics; #[cfg(feature = "otel")] mod otel; -#[cfg(feature = "otel")] +#[cfg(feature = "prom")] +mod prometheus_meter; +#[cfg(feature = "prom")] mod prometheus_server; +// Always export bucket configuration function since it's used by both OTel and Prometheus +pub use metrics::default_buckets_for; + #[cfg(feature = "otel")] pub use metrics::{ ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME, ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, MetricsCallBuffer, WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME, WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME, WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME, - WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, default_buckets_for, + WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, }; #[cfg(feature = "otel")] -pub use otel::{build_otlp_metric_exporter, start_prometheus_metric_exporter}; +pub use otel::build_otlp_metric_exporter; +#[cfg(feature = "prom")] +pub use prometheus_server::start_prometheus_metric_exporter; pub use log_export::{CoreLogBuffer, CoreLogBufferedConsumer, CoreLogStreamConsumer}; diff --git a/core/src/telemetry/otel.rs b/core/src/telemetry/otel.rs index bd8a29569..410e63a83 100644 --- a/core/src/telemetry/otel.rs +++ b/core/src/telemetry/otel.rs @@ -7,7 +7,6 @@ use super::{ WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME, WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, }, - prometheus_server::PromServer, }; use crate::{abstractions::dbg_panic, telemetry::metrics::DEFAULT_S_BUCKETS}; use opentelemetry::{ @@ -16,38 +15,43 @@ use opentelemetry::{ }; use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, WithTonicConfig}; use opentelemetry_sdk::{ - Resource, + Resource, metrics, metrics::{ - Aggregation, Instrument, InstrumentKind, MeterProviderBuilder, MetricError, PeriodicReader, - SdkMeterProvider, Temporality, View, new_view, + Aggregation, Instrument, InstrumentKind, MeterProviderBuilder, PeriodicReader, + SdkMeterProvider, Temporality, }, }; -use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use temporal_sdk_core_api::telemetry::{ HistogramBucketOverrides, MetricTemporality, OtelCollectorOptions, OtlpProtocol, - PrometheusExporterOptions, metrics::{ - CoreMeter, Counter, Gauge, GaugeF64, Histogram, HistogramDuration, HistogramF64, + CoreMeter, Counter, Gauge, GaugeF64, Histogram, HistogramBase, HistogramDuration, + HistogramDurationBase, HistogramF64, HistogramF64Base, MetricAttributable, MetricAttributes, MetricParameters, NewAttributes, }, }; -use tokio::task::AbortHandle; use tonic::{metadata::MetadataMap, transport::ClientTlsConfig}; -/// A specialized `Result` type for metric operations. -type Result = std::result::Result; - -fn histo_view(metric_name: &'static str, use_seconds: bool) -> Result> { - let buckets = default_buckets_for(metric_name, use_seconds); - new_view( - Instrument::new().name(format!("*{metric_name}")), - opentelemetry_sdk::metrics::Stream::new().aggregation( - Aggregation::ExplicitBucketHistogram { - boundaries: buckets.to_vec(), - record_min_max: true, - }, - ), - ) +fn histo_view( + metric_name: &'static str, + use_seconds: bool, +) -> impl Fn(&Instrument) -> Option + Send + Sync + 'static { + let buckets = default_buckets_for(metric_name, use_seconds).to_vec(); + move |ins: &Instrument| { + if ins.name().ends_with(metric_name) { + Some( + metrics::Stream::builder() + .with_aggregation(Aggregation::ExplicitBucketHistogram { + boundaries: buckets.clone(), + record_min_max: true, + }) + .build() + .expect("Hardcoded metric stream always builds"), + ) + } else { + None + } + } } pub(super) fn augment_meter_provider_with_defaults( @@ -55,68 +59,73 @@ pub(super) fn augment_meter_provider_with_defaults( global_tags: &HashMap, use_seconds: bool, bucket_overrides: HistogramBucketOverrides, -) -> Result { +) -> Result { for (name, buckets) in bucket_overrides.overrides { - mpb = mpb.with_view(new_view( - Instrument::new().name(format!("*{name}")), - opentelemetry_sdk::metrics::Stream::new().aggregation( - Aggregation::ExplicitBucketHistogram { - boundaries: buckets, - record_min_max: true, - }, - ), - )?) + mpb = mpb.with_view(move |ins: &Instrument| { + if ins.name().contains(&name) { + Some( + metrics::Stream::builder() + .with_aggregation(Aggregation::ExplicitBucketHistogram { + boundaries: buckets.clone(), + record_min_max: true, + }) + .build() + .expect("Hardcoded metric stream always builds"), + ) + } else { + None + } + }); } let mut mpb = mpb - .with_view(histo_view( - WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME, - use_seconds, - )?) + .with_view(histo_view(WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME, use_seconds)) .with_view(histo_view( WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME, use_seconds, - )?) + )) .with_view(histo_view( WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME, use_seconds, - )?) + )) .with_view(histo_view( WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, use_seconds, - )?) + )) .with_view(histo_view( ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, use_seconds, - )?) + )) .with_view(histo_view( ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME, use_seconds, - )?); + )); // Fallback default - mpb = mpb.with_view(new_view( - { - let mut i = Instrument::new(); - i.kind = Some(InstrumentKind::Histogram); - i - }, - opentelemetry_sdk::metrics::Stream::new().aggregation( - Aggregation::ExplicitBucketHistogram { - boundaries: if use_seconds { - DEFAULT_S_BUCKETS.to_vec() - } else { - DEFAULT_MS_BUCKETS.to_vec() - }, - record_min_max: true, - }, - ), - )?); + mpb = mpb.with_view(move |ins: &Instrument| { + if ins.kind() == InstrumentKind::Histogram { + Some( + metrics::Stream::builder() + .with_aggregation(Aggregation::ExplicitBucketHistogram { + boundaries: if use_seconds { + DEFAULT_S_BUCKETS.to_vec() + } else { + DEFAULT_MS_BUCKETS.to_vec() + }, + record_min_max: true, + }) + .build() + .expect("Hardcoded metric stream always builds"), + ) + } else { + None + } + }); Ok(mpb.with_resource(default_resource(global_tags))) } /// Create an OTel meter that can be used as a [CoreMeter] to export metrics over OTLP. pub fn build_otlp_metric_exporter( opts: OtelCollectorOptions, -) -> std::result::Result { +) -> Result { let exporter = match opts.protocol { OtlpProtocol::Grpc => { let mut exporter = opentelemetry_otlp::MetricExporter::builder() @@ -154,40 +163,6 @@ pub fn build_otlp_metric_exporter( }) } -pub struct StartedPromServer { - pub meter: Arc, - pub bound_addr: SocketAddr, - pub abort_handle: AbortHandle, -} - -/// Builds and runs a prometheus endpoint which can be scraped by prom instances for metrics export. -/// Returns the meter that can be used as a [CoreMeter]. -/// -/// Requires a Tokio runtime to exist, and will block briefly while binding the server endpoint. -pub fn start_prometheus_metric_exporter( - opts: PrometheusExporterOptions, -) -> std::result::Result { - let (srv, exporter) = PromServer::new(&opts)?; - let meter_provider = augment_meter_provider_with_defaults( - MeterProviderBuilder::default().with_reader(exporter), - &opts.global_tags, - opts.use_seconds_for_durations, - opts.histogram_bucket_overrides, - )? - .build(); - let bound_addr = srv.bound_addr()?; - let handle = tokio::spawn(async move { srv.run().await }); - Ok(StartedPromServer { - meter: Arc::new(CoreOtelMeter { - meter: meter_provider.meter(TELEM_SERVICE_NAME), - use_seconds_for_durations: opts.use_seconds_for_durations, - _mp: meter_provider, - }), - bound_addr, - abort_handle: handle.abort_handle(), - }) -} - #[derive(Debug)] pub struct CoreOtelMeter { pub meter: Meter, @@ -218,81 +193,109 @@ impl CoreMeter for CoreOtelMeter { } } - fn counter(&self, params: MetricParameters) -> Arc { - Arc::new( + fn counter(&self, params: MetricParameters) -> Counter { + Counter::new(Arc::new( self.meter .u64_counter(params.name) .with_unit(params.unit) .with_description(params.description) .build(), - ) + )) } - fn histogram(&self, params: MetricParameters) -> Arc { - Arc::new( - self.meter - .u64_histogram(params.name) - .with_unit(params.unit) - .with_description(params.description) - .build(), - ) + fn histogram(&self, params: MetricParameters) -> Histogram { + Histogram::new(Arc::new(self.create_histogram(params))) } - fn histogram_f64(&self, params: MetricParameters) -> Arc { - Arc::new( - self.meter - .f64_histogram(params.name) - .with_unit(params.unit) - .with_description(params.description) - .build(), - ) + fn histogram_f64(&self, params: MetricParameters) -> HistogramF64 { + HistogramF64::new(Arc::new(self.create_histogram_f64(params))) } - fn histogram_duration(&self, mut params: MetricParameters) -> Arc { - Arc::new(if self.use_seconds_for_durations { + fn histogram_duration(&self, mut params: MetricParameters) -> HistogramDuration { + HistogramDuration::new(Arc::new(if self.use_seconds_for_durations { params.unit = "s".into(); - DurationHistogram::Seconds(self.histogram_f64(params)) + DurationHistogram::Seconds(self.create_histogram_f64(params)) } else { params.unit = "ms".into(); - DurationHistogram::Milliseconds(self.histogram(params)) - }) + DurationHistogram::Milliseconds(self.create_histogram(params)) + })) } - fn gauge(&self, params: MetricParameters) -> Arc { - Arc::new( + fn gauge(&self, params: MetricParameters) -> Gauge { + Gauge::new(Arc::new( self.meter .u64_gauge(params.name) .with_unit(params.unit) .with_description(params.description) .build(), - ) + )) } - fn gauge_f64(&self, params: MetricParameters) -> Arc { - Arc::new( + fn gauge_f64(&self, params: MetricParameters) -> GaugeF64 { + GaugeF64::new(Arc::new( self.meter .f64_gauge(params.name) .with_unit(params.unit) .with_description(params.description) .build(), - ) + )) + } +} + +impl CoreOtelMeter { + fn create_histogram(&self, params: MetricParameters) -> opentelemetry::metrics::Histogram { + self.meter + .u64_histogram(params.name) + .with_unit(params.unit) + .with_description(params.description) + .build() + } + + fn create_histogram_f64( + &self, + params: MetricParameters, + ) -> opentelemetry::metrics::Histogram { + self.meter + .f64_histogram(params.name) + .with_unit(params.unit) + .with_description(params.description) + .build() } } -/// A histogram being used to record durations. -#[derive(Clone)] enum DurationHistogram { - Milliseconds(Arc), - Seconds(Arc), + Milliseconds(opentelemetry::metrics::Histogram), + Seconds(opentelemetry::metrics::Histogram), +} + +enum DurationHistogramBase { + Millis(Box), + Secs(Box), } -impl HistogramDuration for DurationHistogram { - fn record(&self, value: Duration, attributes: &MetricAttributes) { + +impl HistogramDurationBase for DurationHistogramBase { + fn records(&self, value: Duration) { match self { - DurationHistogram::Milliseconds(h) => h.record(value.as_millis() as u64, attributes), - DurationHistogram::Seconds(h) => h.record(value.as_secs_f64(), attributes), + DurationHistogramBase::Millis(h) => h.records(value.as_millis() as u64), + DurationHistogramBase::Secs(h) => h.records(value.as_secs_f64()), } } } +impl MetricAttributable> for DurationHistogram { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + Ok(match self { + DurationHistogram::Milliseconds(h) => Box::new(DurationHistogramBase::Millis( + h.with_attributes(attributes)?, + )), + DurationHistogram::Seconds(h) => { + Box::new(DurationHistogramBase::Secs(h.with_attributes(attributes)?)) + } + }) + } +} fn default_resource_instance() -> &'static Resource { use std::sync::OnceLock; diff --git a/core/src/telemetry/prometheus_meter.rs b/core/src/telemetry/prometheus_meter.rs new file mode 100644 index 000000000..99e85f333 --- /dev/null +++ b/core/src/telemetry/prometheus_meter.rs @@ -0,0 +1,851 @@ +use crate::{abstractions::dbg_panic, telemetry::default_buckets_for}; +use anyhow::anyhow; +use parking_lot::RwLock; +use prometheus::{ + GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec, Opts, + core::{Collector, Desc, GenericCounter}, + proto::{LabelPair, MetricFamily}, +}; +use std::{ + collections::{BTreeMap, HashMap, HashSet, btree_map, hash_map}, + fmt::{Debug, Formatter}, + sync::Arc, + time::Duration, +}; +use temporal_sdk_core_api::telemetry::metrics::{ + CoreMeter, Counter, CounterBase, Gauge, GaugeBase, GaugeF64, GaugeF64Base, Histogram, + HistogramBase, HistogramDuration, HistogramDurationBase, HistogramF64, HistogramF64Base, + MetricAttributable, MetricAttributes, MetricParameters, NewAttributes, OrderedMetricLabelSet, +}; + +#[derive(derive_more::From, derive_more::TryInto, Debug, Clone)] +enum PromCollector { + Histo(HistogramVec), + Counter(IntCounterVec), + Gauge(IntGaugeVec), + GaugeF64(GaugeVec), +} + +impl Collector for PromCollector { + fn desc(&self) -> Vec<&Desc> { + match self { + PromCollector::Histo(v) => v.desc(), + PromCollector::Counter(v) => v.desc(), + PromCollector::Gauge(v) => v.desc(), + PromCollector::GaugeF64(v) => v.desc(), + } + } + + fn collect(&self) -> Vec { + match self { + PromCollector::Histo(v) => v.collect(), + PromCollector::Counter(v) => v.collect(), + PromCollector::Gauge(v) => v.collect(), + PromCollector::GaugeF64(v) => v.collect(), + } + } +} + +/// Replaces Prometheus's default registry with a custom one that allows us to register metrics that +/// have different label sets for the same name. +#[derive(Clone)] +pub(super) struct Registry { + collectors_by_id: Arc>>, + global_tags: BTreeMap, +} + +// A lot of this implementation code is lifted from the prometheus crate itself, and as such is a +// derivative work of the following: +// Copyright 2014 The Prometheus Authors +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +impl Registry { + pub(super) fn new(global_tags: HashMap) -> Self { + Self { + collectors_by_id: Arc::new(RwLock::new(HashMap::new())), + global_tags: BTreeMap::from_iter(global_tags), + } + } + + // Register a collector, potentially returning an existing collector that matches the provided + // one. In such cases the passed-in collector is discarded. + fn register>(&self, c: T) -> Option { + let mut desc_id_set = HashSet::new(); + let mut collector_id: u64 = 0; + let c = c.into(); + + for desc in c.desc() { + // If it is not a duplicate desc in this collector, add it to + // the collector_id. Here we assume that collectors (ie: metric vecs / histograms etc) + // should internally not repeat the same descriptor -- even though we allow entirely + // separate metrics with overlapping labels to be registered generally. + if desc_id_set.insert(desc.id) { + // Add the id and the dim hash, which includes both static and variable labels + collector_id = collector_id + .wrapping_add(desc.id) + .wrapping_add(desc.dim_hash); + } else { + dbg_panic!( + "Prometheus metric has duplicate descriptors, values may not be recorded on \ + this metric. This is an SDK bug. Details: {:?}", + c.desc(), + ); + return None; + } + } + match self.collectors_by_id.write().entry(collector_id) { + hash_map::Entry::Vacant(vc) => { + vc.insert(c); + None + } + hash_map::Entry::Occupied(o) => Some(o.get().clone()), + } + } + + pub(super) fn gather(&self) -> Vec { + let mut mf_by_name = BTreeMap::new(); + + for c in self.collectors_by_id.read().values() { + let mfs = c.collect(); + for mut mf in mfs { + if mf.get_metric().is_empty() { + continue; + } + + let name = mf.name().to_owned(); + match mf_by_name.entry(name) { + btree_map::Entry::Vacant(entry) => { + entry.insert(mf); + } + btree_map::Entry::Occupied(mut entry) => { + let existent_mf = entry.get_mut(); + let existent_metrics = existent_mf.mut_metric(); + for metric in mf.take_metric().into_iter() { + existent_metrics.push(metric); + } + } + } + } + } + + // Now that MetricFamilies are all set, sort their Metrics + // lexicographically by their label values. + for mf in mf_by_name.values_mut() { + mf.mut_metric().sort_by(|m1, m2| { + let lps1 = m1.get_label(); + let lps2 = m2.get_label(); + + if lps1.len() != lps2.len() { + return lps1.len().cmp(&lps2.len()); + } + + for (lp1, lp2) in lps1.iter().zip(lps2.iter()) { + if lp1.value() != lp2.value() { + return lp1.value().cmp(lp2.value()); + } + } + + // We should never arrive here. Multiple metrics with the same + // label set in the same scrape will lead to undefined ingestion + // behavior. However, as above, we have to provide stable sorting + // here, even for inconsistent metrics. So sort equal metrics + // by their timestamp, with missing timestamps (implying "now") + // coming last. + m1.timestamp_ms().cmp(&m2.timestamp_ms()) + }); + } + + mf_by_name + .into_values() + .map(|mut m| { + if self.global_tags.is_empty() { + return m; + } + // Add global labels + let pairs: Vec = self + .global_tags + .iter() + .map(|(k, v)| { + let mut label = LabelPair::default(); + label.set_name(k.to_string()); + label.set_value(v.to_string()); + label + }) + .collect(); + + for metric in m.mut_metric().iter_mut() { + let mut labels: Vec<_> = metric.take_label(); + labels.append(&mut pairs.clone()); + metric.set_label(labels); + } + m + }) + .collect() + } +} + +impl Debug for Registry { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Registry({} collectors)", + self.collectors_by_id.read().keys().len() + ) + } +} + +#[derive(Debug)] +struct PromMetric { + metric_name: String, + metric_description: String, + registry: Registry, + /// Map from label schema to the corresponding Prometheus vector metric + vectors: RwLock, T>>, + /// Bucket configuration for histograms (None for other metric types) + histogram_buckets: Option>, +} + +impl PromMetric +where + T: Clone + Into + TryFrom + 'static, +{ + fn new(metric_name: String, metric_description: String, registry: Registry) -> Self { + Self { + metric_name, + metric_description, + registry, + vectors: RwLock::new(HashMap::new()), + histogram_buckets: None, + } + } + + /// Generic double-checked locking pattern for vector creation + fn get_or_create_vector( + &self, + kvs: &OrderedMetricLabelSet, + create_fn: F, + ) -> anyhow::Result + where + F: FnOnce(&str, &str, &[&str]) -> T, + { + // Just the metric label names + let mut schema: Vec = kvs.keys_ordered().map(|kv| kv.to_string()).collect(); + schema.sort(); + + { + let vectors = self.vectors.read(); + if let Some(vector) = vectors.get(&schema) { + return Ok(vector.clone()); + } + } + + let mut vectors = self.vectors.write(); + // Double-check in case another thread created it + if let Some(vector) = vectors.get(&schema) { + return Ok(vector.clone()); + } + + let description = if self.metric_description.is_empty() { + &self.metric_name + } else { + &self.metric_description + }; + + let boxed: Box<[&str]> = schema.iter().map(String::as_str).collect(); + let vector = create_fn(&self.metric_name, description, &boxed); + + let maybe_exists = self.registry.register(vector.clone()); + let vector = if let Some(m) = maybe_exists { + T::try_from(m).map_err(|_| { + anyhow!( + "Tried to register a metric that already exists as a different type: {:?}", + self.metric_name + ) + })? + } else { + vector + }; + + vectors.insert(schema, vector.clone()); + Ok(vector) + } +} + +impl PromMetric { + fn new_with_buckets( + metric_name: String, + metric_description: String, + registry: Registry, + buckets: Vec, + ) -> Self { + Self { + metric_name, + metric_description, + registry, + vectors: RwLock::new(HashMap::new()), + histogram_buckets: Some(buckets), + } + } + + fn get_or_create_vector_with_buckets( + &self, + labels: &OrderedMetricLabelSet, + ) -> anyhow::Result { + self.get_or_create_vector(labels, |name, desc, label_names| { + let mut opts = prometheus::HistogramOpts::new(name, desc); + if let Some(buckets) = &self.histogram_buckets { + opts = opts.buckets(buckets.clone()); + } + HistogramVec::new(opts, label_names).unwrap() + }) + } +} + +static EMPTY_LABEL_SET: OrderedMetricLabelSet = OrderedMetricLabelSet { + attributes: BTreeMap::new(), +}; + +impl PromMetric +where + T: Clone + Collector + 'static, +{ + fn extract_prometheus_labels<'a>( + &self, + attributes: &'a MetricAttributes, + ) -> anyhow::Result<&'a OrderedMetricLabelSet, anyhow::Error> { + if matches!(attributes, MetricAttributes::Empty) { + return Ok(&EMPTY_LABEL_SET); + } + if let MetricAttributes::Prometheus { labels } = attributes { + Ok(labels) + } else { + let e = anyhow!( + "Must use Prometheus attributes with a Prometheus metric implementation. Got: {:?}", + attributes + ); + dbg_panic!("{:?}", e); + Err(e) + } + } + + fn label_mismatch_err(&self, attributes: &MetricAttributes) -> anyhow::Error { + let e = anyhow!( + "Mismatch between expected # of prometheus labels and provided for metric {}. \ + This is an SDK bug. Attributes: {:?}", + self.metric_name, + attributes, + ); + dbg_panic!("{:?}", e); + e + } +} + +struct CorePromCounter(GenericCounter); +impl CounterBase for CorePromCounter { + fn adds(&self, value: u64) { + self.0.inc_by(value); + } +} +impl MetricAttributable> for PromMetric { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + let labels = self.extract_prometheus_labels(attributes)?; + let vector = self.get_or_create_vector(labels, |name, desc, label_names| { + let opts = Opts::new(name, desc); + IntCounterVec::new(opts, label_names).unwrap() + })?; + if let Ok(c) = vector.get_metric_with(&labels.as_prom_labels()) { + Ok(Box::new(CorePromCounter(c))) + } else { + Err(self.label_mismatch_err(attributes).into()) + } + } +} + +struct CorePromIntGauge(prometheus::IntGauge); +impl GaugeBase for CorePromIntGauge { + fn records(&self, value: u64) { + self.0.set(value as i64); + } +} +impl MetricAttributable> for PromMetric { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + let labels = self.extract_prometheus_labels(attributes)?; + let vector = self.get_or_create_vector(labels, |name, desc, label_names| { + let opts = Opts::new(name, desc); + IntGaugeVec::new(opts, label_names).unwrap() + })?; + if let Ok(g) = vector.get_metric_with(&labels.as_prom_labels()) { + Ok(Box::new(CorePromIntGauge(g))) + } else { + Err(self.label_mismatch_err(attributes).into()) + } + } +} + +struct CorePromGauge(prometheus::Gauge); +impl GaugeF64Base for CorePromGauge { + fn records(&self, value: f64) { + self.0.set(value); + } +} +impl MetricAttributable> for PromMetric { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + let labels = self.extract_prometheus_labels(attributes)?; + let vector = self.get_or_create_vector(labels, |name, desc, label_names| { + let opts = Opts::new(name, desc); + GaugeVec::new(opts, label_names).unwrap() + })?; + if let Ok(g) = vector.get_metric_with(&labels.as_prom_labels()) { + Ok(Box::new(CorePromGauge(g))) + } else { + Err(self.label_mismatch_err(attributes).into()) + } + } +} + +#[derive(Clone)] +struct CorePromHistogram(prometheus::Histogram); +impl HistogramBase for CorePromHistogram { + fn records(&self, value: u64) { + self.0.observe(value as f64); + } +} +impl HistogramF64Base for CorePromHistogram { + fn records(&self, value: f64) { + self.0.observe(value); + } +} + +#[derive(Debug)] +struct PromHistogramU64(PromMetric); +impl MetricAttributable> for PromHistogramU64 { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + let labels = self.0.extract_prometheus_labels(attributes)?; + let vector = self.0.get_or_create_vector_with_buckets(labels)?; + if let Ok(h) = vector.get_metric_with(&labels.as_prom_labels()) { + Ok(Box::new(CorePromHistogram(h))) + } else { + Err(self.0.label_mismatch_err(attributes).into()) + } + } +} + +#[derive(Debug)] +struct PromHistogramF64(PromMetric); +impl MetricAttributable> for PromHistogramF64 { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + let labels = self.0.extract_prometheus_labels(attributes)?; + let vector = self.0.get_or_create_vector_with_buckets(labels)?; + if let Ok(h) = vector.get_metric_with(&labels.as_prom_labels()) { + Ok(Box::new(CorePromHistogram(h))) + } else { + Err(self.0.label_mismatch_err(attributes).into()) + } + } +} + +/// A CoreMeter implementation backed by Prometheus metrics with dynamic label management +#[derive(Debug)] +pub struct CorePrometheusMeter { + registry: Registry, + use_seconds_for_durations: bool, + unit_suffix: bool, + bucket_overrides: temporal_sdk_core_api::telemetry::HistogramBucketOverrides, +} + +impl CorePrometheusMeter { + pub(super) fn new( + registry: Registry, + use_seconds_for_durations: bool, + unit_suffix: bool, + bucket_overrides: temporal_sdk_core_api::telemetry::HistogramBucketOverrides, + ) -> Self { + Self { + registry, + use_seconds_for_durations, + unit_suffix, + bucket_overrides, + } + } + + fn create_u64_hist(&self, params: &MetricParameters) -> PromHistogramU64 { + let base_name = params.name.to_string(); + let actual_metric_name = self.get_histogram_metric_name(&base_name, ¶ms.unit); + let buckets = self.get_buckets_for_metric(&base_name); + PromHistogramU64(PromMetric::new_with_buckets( + actual_metric_name, + params.description.to_string(), + self.registry.clone(), + buckets, + )) + } + + fn create_f64_hist(&self, params: &MetricParameters) -> PromHistogramF64 { + let base_name = params.name.to_string(); + let actual_metric_name = self.get_histogram_metric_name(&base_name, ¶ms.unit); + let buckets = self.get_buckets_for_metric(&base_name); + PromHistogramF64(PromMetric::new_with_buckets( + actual_metric_name, + params.description.to_string(), + self.registry.clone(), + buckets, + )) + } +} + +impl CoreMeter for CorePrometheusMeter { + fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes { + MetricAttributes::Prometheus { + labels: Arc::new(attribs.into()), + } + } + + fn extend_attributes( + &self, + existing: MetricAttributes, + new: NewAttributes, + ) -> MetricAttributes { + if let MetricAttributes::Prometheus { + labels: existing_labels, + } = existing + { + let mut all_labels = Arc::unwrap_or_clone(existing_labels); + for kv in new.attributes.into_iter() { + all_labels.attributes.insert(kv.key, kv.value); + } + MetricAttributes::Prometheus { + labels: Arc::new(all_labels), + } + } else { + dbg_panic!("Must use Prometheus attributes with a Prometheus metric implementation"); + self.new_attributes(new) + } + } + + fn counter(&self, params: MetricParameters) -> Counter { + let metric_name = params.name.to_string(); + Counter::new(Arc::new(PromMetric::::new( + metric_name, + params.description.to_string(), + self.registry.clone(), + ))) + } + + fn histogram(&self, params: MetricParameters) -> Histogram { + let hist = self.create_u64_hist(¶ms); + Histogram::new(Arc::new(hist)) + } + + fn histogram_f64(&self, params: MetricParameters) -> HistogramF64 { + let hist = self.create_f64_hist(¶ms); + HistogramF64::new(Arc::new(hist)) + } + + fn histogram_duration(&self, mut params: MetricParameters) -> HistogramDuration { + HistogramDuration::new(Arc::new(if self.use_seconds_for_durations { + params.unit = "seconds".into(); + DurationHistogram::Seconds(self.create_f64_hist(¶ms)) + } else { + params.unit = "milliseconds".into(); + DurationHistogram::Milliseconds(self.create_u64_hist(¶ms)) + })) + } + + fn gauge(&self, params: MetricParameters) -> Gauge { + let metric_name = params.name.to_string(); + Gauge::new(Arc::new(PromMetric::::new( + metric_name, + params.description.to_string(), + self.registry.clone(), + ))) + } + + fn gauge_f64(&self, params: MetricParameters) -> GaugeF64 { + let metric_name = params.name.to_string(); + GaugeF64::new(Arc::new(PromMetric::::new( + metric_name, + params.description.to_string(), + self.registry.clone(), + ))) + } +} + +impl CorePrometheusMeter { + fn get_buckets_for_metric(&self, metric_name: &str) -> Vec { + for (name_pattern, buckets) in &self.bucket_overrides.overrides { + if metric_name.contains(name_pattern) { + return buckets.clone(); + } + } + let base_metric_name = metric_name.strip_prefix("temporal_").unwrap_or(metric_name); + default_buckets_for(base_metric_name, self.use_seconds_for_durations).to_vec() + } + + fn get_histogram_metric_name(&self, base_name: &str, unit: &str) -> String { + if self.unit_suffix && !unit.is_empty() { + format!("{}_{}", base_name, unit) + } else { + base_name.to_string() + } + } +} + +enum DurationHistogram { + Milliseconds(PromHistogramU64), + Seconds(PromHistogramF64), +} + +enum DurationHistogramBase { + Millis(Box), + Secs(Box), +} + +impl HistogramDurationBase for DurationHistogramBase { + fn records(&self, value: Duration) { + match self { + DurationHistogramBase::Millis(h) => h.records(value.as_millis() as u64), + DurationHistogramBase::Secs(h) => h.records(value.as_secs_f64()), + } + } +} +impl MetricAttributable> for DurationHistogram { + fn with_attributes( + &self, + attributes: &MetricAttributes, + ) -> Result, Box> { + Ok(match self { + DurationHistogram::Milliseconds(h) => Box::new(DurationHistogramBase::Millis( + h.with_attributes(attributes)?, + )), + DurationHistogram::Seconds(h) => { + Box::new(DurationHistogramBase::Secs(h.with_attributes(attributes)?)) + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::telemetry::{TelemetryInstance, metrics::MetricsContext}; + use prometheus::{Encoder, TextEncoder}; + use temporal_sdk_core_api::telemetry::{ + METRIC_PREFIX, + metrics::{MetricKeyValue, NewAttributes}, + }; + + #[test] + fn test_prometheus_meter_dynamic_labels() { + let registry = Registry::new(HashMap::from([("global".to_string(), "value".to_string())])); + let meter = CorePrometheusMeter::new( + registry.clone(), + false, + false, + temporal_sdk_core_api::telemetry::HistogramBucketOverrides::default(), + ); + + let counter = meter.counter(MetricParameters { + name: "test_counter".into(), + description: "A test counter metric".into(), + unit: "".into(), + }); + + let attrs1 = meter.new_attributes(NewAttributes::new(vec![ + MetricKeyValue::new("service", "service1"), + MetricKeyValue::new("method", "get"), + ])); + counter.add(5, &attrs1); + + let attrs2 = meter.new_attributes(NewAttributes::new(vec![ + MetricKeyValue::new("service", "service2"), + MetricKeyValue::new("method", "post"), + ])); + counter.add(3, &attrs2); + + let output = output_string(®istry); + + // Both label combinations should be present + assert!( + output.contains("test_counter{method=\"get\",service=\"service1\",global=\"value\"} 5") + ); + assert!( + output + .contains("test_counter{method=\"post\",service=\"service2\",global=\"value\"} 3") + ); + } + + #[test] + fn test_extend_attributes() { + let registry = Registry::new(HashMap::new()); + let meter = CorePrometheusMeter::new( + registry.clone(), + false, + false, + temporal_sdk_core_api::telemetry::HistogramBucketOverrides::default(), + ); + + let base_attrs = meter.new_attributes(NewAttributes::new(vec![ + MetricKeyValue::new("service", "my_service"), + MetricKeyValue::new("version", "1.0"), + ])); + + let extended_attrs = meter.extend_attributes( + base_attrs, + NewAttributes::new(vec![ + MetricKeyValue::new("method", "GET"), + MetricKeyValue::new("version", "2.0"), // This should override + ]), + ); + + let counter = meter.counter(MetricParameters { + name: "test_extended".into(), + description: "Test extended attributes".into(), + unit: "".into(), + }); + counter.add(1, &extended_attrs); + + let output = output_string(®istry); + + assert!(output.contains("service=\"my_service\"")); + assert!(output.contains("method=\"GET\"")); + assert!(output.contains("version=\"2.0\"")); + assert!(!output.contains("version=\"1.0\"")); + } + + #[test] + fn test_workflow_e2e_latency_buckets() { + let registry = Registry::new(HashMap::new()); + + let meter_ms = CorePrometheusMeter::new( + registry.clone(), + false, + false, + temporal_sdk_core_api::telemetry::HistogramBucketOverrides::default(), + ); + + let histogram_ms = meter_ms.histogram_duration(MetricParameters { + name: format!( + "temporal_{}", + crate::telemetry::WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME + ) + .into(), + description: "Test workflow e2e latency".into(), + unit: "duration".into(), + }); + let attrs = meter_ms.new_attributes(NewAttributes::new(vec![])); + histogram_ms.record(Duration::from_millis(100), &attrs); + + let output = output_string(®istry); + + println!("Milliseconds histogram output:\n{}", output); + + assert!( + output.contains("le=\"100\""), + "Missing le=\"100\" bucket in milliseconds output" + ); + + // Test seconds configuration + let registry_s = Registry::new(HashMap::new()); + let meter_s = CorePrometheusMeter::new( + registry_s.clone(), + true, + false, + temporal_sdk_core_api::telemetry::HistogramBucketOverrides::default(), + ); + + let histogram_s = meter_s.histogram_duration(MetricParameters { + name: format!( + "temporal_{}", + crate::telemetry::WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME + ) + .into(), + description: "Test workflow e2e latency".into(), + unit: "duration".into(), + }); + let attrs_s = meter_s.new_attributes(NewAttributes::new(vec![])); + histogram_s.record(Duration::from_millis(100), &attrs_s); + + let output_s = output_string(®istry_s); + + println!("Seconds histogram output:\n{}", output_s); + + assert!( + output_s.contains("le=\"0.1\""), + "Missing le=\"0.1\" bucket in seconds output" + ); + } + + #[test] + fn can_record_with_no_labels() { + let registry = Registry::new(HashMap::new()); + let meter = CorePrometheusMeter::new( + registry.clone(), + false, + false, + temporal_sdk_core_api::telemetry::HistogramBucketOverrides::default(), + ); + let counter = meter.counter(MetricParameters { + name: "no_labels".into(), + description: "No labels".into(), + unit: "".into(), + }); + counter.adds(1); + + let output = output_string(®istry); + + assert!(output.contains("no_labels 1")); + } + + #[test] + fn works_with_recreated_metrics_context() { + let registry = Registry::new(HashMap::new()); + let meter = CorePrometheusMeter::new( + registry.clone(), + false, + false, + temporal_sdk_core_api::telemetry::HistogramBucketOverrides::default(), + ); + let telem_instance = TelemetryInstance::new( + None, + None, + METRIC_PREFIX.to_string(), + Some(Arc::new(meter)), + true, + ); + let mc = MetricsContext::top_level("foo".to_string(), "q".to_string(), &telem_instance); + mc.worker_registered(); + drop(mc); + + let mc = MetricsContext::top_level("foo".to_string(), "q".to_string(), &telem_instance); + mc.worker_registered(); + + let mc = MetricsContext::top_level("foo".to_string(), "q2".to_string(), &telem_instance); + mc.worker_registered(); + + let output = output_string(®istry); + assert!(output.contains("temporal_worker_start{namespace=\"foo\",service_name=\"temporal-core-sdk\",task_queue=\"q\"} 2")); + assert!(output.contains("temporal_worker_start{namespace=\"foo\",service_name=\"temporal-core-sdk\",task_queue=\"q2\"} 1")); + } + + fn output_string(registry: &Registry) -> String { + let metric_families = registry.gather(); + let encoder = TextEncoder::new(); + let mut buffer = vec![]; + encoder.encode(&metric_families, &mut buffer).unwrap(); + String::from_utf8(buffer).unwrap() + } +} diff --git a/core/src/telemetry/prometheus_server.rs b/core/src/telemetry/prometheus_server.rs index 46a732378..74ae5a114 100644 --- a/core/src/telemetry/prometheus_server.rs +++ b/core/src/telemetry/prometheus_server.rs @@ -1,14 +1,48 @@ +use crate::telemetry::prometheus_meter::Registry; use http_body_util::Full; use hyper::{Method, Request, Response, body::Bytes, header::CONTENT_TYPE, service::service_fn}; use hyper_util::{ rt::{TokioExecutor, TokioIo}, server::conn::auto, }; -use opentelemetry_prometheus::PrometheusExporter; -use prometheus::{Encoder, Registry, TextEncoder}; -use std::net::{SocketAddr, TcpListener}; +use prometheus::{Encoder, TextEncoder}; +use std::{ + net::{SocketAddr, TcpListener}, + sync::Arc, +}; use temporal_sdk_core_api::telemetry::PrometheusExporterOptions; -use tokio::io; +use tokio::{io, task::AbortHandle}; + +pub struct StartedPromServer { + pub meter: Arc, + pub bound_addr: SocketAddr, + pub abort_handle: AbortHandle, +} + +/// Builds and runs a prometheus endpoint which can be scraped by prom instances for metrics export. +/// Returns the meter that can be used as a [CoreMeter]. +/// +/// Requires a Tokio runtime to exist, and will block briefly while binding the server endpoint. +pub fn start_prometheus_metric_exporter( + opts: PrometheusExporterOptions, +) -> Result { + let srv = PromServer::new(&opts)?; + let meter = Arc::new( + crate::telemetry::prometheus_meter::CorePrometheusMeter::new( + srv.registry().clone(), + opts.use_seconds_for_durations, + opts.unit_suffix, + opts.histogram_bucket_overrides, + ), + ); + let bound_addr = srv.bound_addr()?; + let handle = tokio::spawn(async move { srv.run().await }); + Ok(StartedPromServer { + meter, + bound_addr, + abort_handle: handle.abort_handle(), + }) +} /// Exposes prometheus metrics for scraping pub(super) struct PromServer { @@ -17,30 +51,16 @@ pub(super) struct PromServer { } impl PromServer { - pub(super) fn new( - opts: &PrometheusExporterOptions, - ) -> Result<(Self, PrometheusExporter), anyhow::Error> { - let registry = Registry::new(); - let exporter = opentelemetry_prometheus::exporter() - .without_scope_info() - .with_registry(registry.clone()); - let exporter = if !opts.counters_total_suffix { - exporter.without_counter_suffixes() - } else { - exporter - }; - let exporter = if !opts.unit_suffix { - exporter.without_units() - } else { - exporter - }; - Ok(( - Self { - listener: TcpListener::bind(opts.socket_addr)?, - registry, - }, - exporter.build()?, - )) + pub(super) fn new(opts: &PrometheusExporterOptions) -> Result { + let registry = Registry::new(opts.global_tags.clone()); + Ok(Self { + listener: TcpListener::bind(opts.socket_addr)?, + registry, + }) + } + + pub(super) fn registry(&self) -> &Registry { + &self.registry } pub(super) async fn run(self) -> Result<(), anyhow::Error> { diff --git a/core/src/worker/tuner/resource_based.rs b/core/src/worker/tuner/resource_based.rs index 99d24da84..173418413 100644 --- a/core/src/worker/tuner/resource_based.rs +++ b/core/src/worker/tuner/resource_based.rs @@ -190,10 +190,10 @@ struct PidControllers { } struct MetricInstruments { attribs: MetricAttributes, - mem_usage: Arc, - cpu_usage: Arc, - mem_pid_output: Arc, - cpu_pid_output: Arc, + mem_usage: GaugeF64, + cpu_usage: GaugeF64, + mem_pid_output: GaugeF64, + cpu_pid_output: GaugeF64, } #[derive(Clone, Copy, Default)] struct LastMetricVals { diff --git a/etc/deps.svg b/etc/deps.svg index 489ecc794..9b157f3f7 100644 --- a/etc/deps.svg +++ b/etc/deps.svg @@ -1,162 +1,162 @@ - - + - + 0 - -temporal-sdk-core + +temporal-sdk-core 1 - -rustfsm + +rustfsm 0->1 - - + + 3 - -temporal-client + +temporal-client 0->3 - - + + 4 - -temporal-sdk-core-protos + +temporal-sdk-core-api - + 0->4 - - + + 5 - -temporal-sdk-core-api + +temporal-sdk-core-protos - + 0->5 - - + + 6 - -temporal-sdk + +temporal-sdk 0->6 - - + + 7 - -temporal-sdk-core-test-utils + +temporal-sdk-core-test-utils 0->7 - - + + 3->4 - - + + - + -5->3 - - +3->5 + + - + -5->4 - - +4->5 + + 6->0 - - + + 6->3 - - + + - + 6->4 - - + + - + 6->5 - - + + 7->0 - - + + 7->3 - - + + - + 7->4 - - + + - + 7->5 - - + + 7->6 - - + + diff --git a/sdk/src/activity_context.rs b/sdk/src/activity_context.rs index d71501a29..804c5b87e 100644 --- a/sdk/src/activity_context.rs +++ b/sdk/src/activity_context.rs @@ -149,10 +149,12 @@ impl ActContext { /// RecordHeartbeat sends heartbeat for the currently executing activity pub fn record_heartbeat(&self, details: Vec) { - self.worker.record_activity_heartbeat(ActivityHeartbeat { - task_token: self.info.task_token.clone(), - details, - }) + if !self.info.is_local { + self.worker.record_activity_heartbeat(ActivityHeartbeat { + task_token: self.info.task_token.clone(), + details, + }) + } } /// Get activity info of the executing activity diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 9fc734a86..9803cff36 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -18,6 +18,7 @@ use parking_lot::Mutex; use prost::Message; use rand::Rng; use std::{ + cell::Cell, convert::TryFrom, env, future::Future, @@ -171,9 +172,16 @@ pub async fn history_from_proto_binary(path_from_root: &str) -> Result = const { Cell::new(false) }; +} static INTEG_TESTS_RT: std::sync::OnceLock = std::sync::OnceLock::new(); -pub fn init_integ_telem() -> &'static CoreRuntime { - INTEG_TESTS_RT.get_or_init(|| { +pub fn init_integ_telem() -> Option<&'static CoreRuntime> { + if DONT_AUTO_INIT_INTEG_TELEM.get() { + return None; + } + Some(INTEG_TESTS_RT.get_or_init(|| { let telemetry_options = get_integ_telem_options(); let rt = CoreRuntime::new_assume_tokio(telemetry_options).expect("Core runtime inits cleanly"); @@ -181,7 +189,7 @@ pub fn init_integ_telem() -> &'static CoreRuntime { let _ = tracing::subscriber::set_global_default(sub); } rt - }) + })) } /// Implements a builder pattern to help integ tests initialize core and create workflows diff --git a/tests/heavy_tests.rs b/tests/heavy_tests.rs index 5d56ed5e1..41e83c2fe 100644 --- a/tests/heavy_tests.rs +++ b/tests/heavy_tests.rs @@ -12,13 +12,15 @@ use std::{ }; use temporal_client::{GetWorkflowResultOpts, WfClientExt, WorkflowClientTrait, WorkflowOptions}; use temporal_sdk::{ActContext, ActivityOptions, WfContext, WorkflowResult}; -use temporal_sdk_core::{ResourceBasedTuner, ResourceSlotOptions}; +use temporal_sdk_core::{CoreRuntime, ResourceBasedTuner, ResourceSlotOptions}; use temporal_sdk_core_api::worker::PollerBehavior; use temporal_sdk_core_protos::{ coresdk::{AsJsonPayloadExt, workflow_commands::ActivityCancellationType}, temporal::api::enums::v1::WorkflowIdReusePolicy, }; -use temporal_sdk_core_test_utils::{CoreWfStarter, rand_6_chars, workflows::la_problem_workflow}; +use temporal_sdk_core_test_utils::{ + CoreWfStarter, init_integ_telem, prom_metrics, rand_6_chars, workflows::la_problem_workflow, +}; mod fuzzy_workflow; @@ -183,7 +185,13 @@ async fn workflow_load() { const SIGNAME: &str = "signame"; let num_workflows = 500; let wf_name = "workflow_load"; - let mut starter = CoreWfStarter::new("workflow_load"); + let (mut telemopts, _, _aborter) = prom_metrics(None); + // Avoid initting two logging systems, since when this test is run with others it can + // cause us to encounter the tracing span drop bug + telemopts.logging = None; + init_integ_telem(); + let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let mut starter = CoreWfStarter::new_with_runtime("workflow_load", rt); starter .worker_config .max_outstanding_workflow_tasks(5_usize) diff --git a/tests/integ_tests/client_tests.rs b/tests/integ_tests/client_tests.rs index 3cf985c02..3801e25c3 100644 --- a/tests/integ_tests/client_tests.rs +++ b/tests/integ_tests/client_tests.rs @@ -1,6 +1,6 @@ use assert_matches::assert_matches; use futures_util::{FutureExt, future::BoxFuture}; -use http_body_util::BodyExt; +use http_body_util::Full; use prost::Message; use std::{ collections::HashMap, @@ -30,7 +30,7 @@ use tokio::{ }; use tonic::{ Code, Request, Status, - body::BoxBody, + body::Body, codegen::{Service, http::Response}, server::NamedService, transport::Server, @@ -108,11 +108,11 @@ struct GenericService { header_tx: UnboundedSender, response_maker: F, } -impl Service> for GenericService +impl Service> for GenericService where - F: FnMut() -> Response, + F: FnMut() -> Response, { - type Response = Response; + type Response = Response; type Error = Infallible; type Future = BoxFuture<'static, Result>; @@ -120,7 +120,7 @@ where Poll::Ready(Ok(())) } - fn call(&mut self, req: tonic::codegen::http::Request) -> Self::Future { + fn call(&mut self, req: tonic::codegen::http::Request) -> Self::Future { self.header_tx .send( String::from_utf8_lossy( @@ -149,7 +149,7 @@ struct FakeServer { async fn fake_server(response_maker: F) -> FakeServer where - F: FnMut() -> Response + Clone + Send + 'static, + F: FnMut() -> Response + Clone + Send + Sync + 'static, { let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); let (header_tx, header_rx) = tokio::sync::mpsc::unbounded_channel(); @@ -191,7 +191,7 @@ impl FakeServer { #[tokio::test] async fn timeouts_respected_one_call_fake_server() { - let mut fs = fake_server(|| Response::new(tonic::codegen::empty_body())).await; + let mut fs = fake_server(|| Response::new(Body::empty())).await; let header_rx = &mut fs.header_rx; let mut opts = get_integ_server_options(); @@ -335,7 +335,7 @@ async fn namespace_header_attached_to_relevant_calls() { .add_service(GenericService { header_to_parse: "Temporal-Namespace", header_tx, - response_maker: || Response::new(tonic::codegen::empty_body()), + response_maker: || Response::new(Body::empty()), }) .serve_with_incoming_shutdown( tokio_stream::wrappers::TcpListenerStream::new(listener), @@ -409,7 +409,7 @@ async fn cloud_ops_test() { assert_eq!(res.into_inner().namespace.unwrap().namespace, namespace); } -fn make_ok_response(message: T) -> Response +fn make_ok_response(message: T) -> Response where T: Message, { @@ -425,9 +425,8 @@ where let len = buf.len() as u32; frame.extend_from_slice(&len.to_be_bytes()); frame.extend_from_slice(&buf); - let full_body = http_body_util::Full::from(frame) - .map_err(|e: Infallible| -> Status { unreachable!("Infallible error: {:?}", e) }); - let body = BoxBody::new(full_body); + let full_body = Full::new(frame.into()); + let body = Body::new(full_body); // Build the HTTP response with the required gRPC headers. Response::builder() diff --git a/tests/integ_tests/metrics_tests.rs b/tests/integ_tests/metrics_tests.rs index 029a1e45c..1e21f0372 100644 --- a/tests/integ_tests/metrics_tests.rs +++ b/tests/integ_tests/metrics_tests.rs @@ -1,7 +1,13 @@ use crate::integ_tests::mk_nexus_endpoint; use anyhow::anyhow; use assert_matches::assert_matches; -use std::{collections::HashMap, env, string::ToString, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + env, + string::ToString, + sync::{Arc, OnceLock}, + time::Duration, +}; use temporal_client::{ REQUEST_LATENCY_HISTOGRAM_NAME, WorkflowClientTrait, WorkflowOptions, WorkflowService, }; @@ -19,7 +25,10 @@ use temporal_sdk_core_api::{ telemetry::{ HistogramBucketOverrides, OtelCollectorOptionsBuilder, OtlpProtocol, PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder, - metrics::{CoreMeter, MetricAttributes, MetricParameters}, + metrics::{ + CoreMeter, CounterBase, Gauge, GaugeBase, HistogramBase, MetricKeyValue, + MetricParameters, MetricParametersBuilder, NewAttributes, + }, }, worker::{ PollerBehavior, SlotKind, SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, @@ -72,6 +81,7 @@ async fn prometheus_metrics_exported( ) { let mut opts_builder = PrometheusExporterOptionsBuilder::default(); opts_builder + .global_tags(HashMap::from([("global".to_string(), "hi!".to_string())])) .socket_addr(ANY_PORT.parse().unwrap()) .use_seconds_for_durations(use_seconds_latency); if custom_buckets { @@ -99,25 +109,25 @@ async fn prometheus_metrics_exported( let body = get_text(format!("http://{addr}/metrics")).await; assert!(body.contains( - "temporal_request_latency_count{operation=\"ListNamespaces\",service_name=\"temporal-core-sdk\"} 1" + "temporal_request_latency_count{operation=\"ListNamespaces\",service_name=\"temporal-core-sdk\",global=\"hi!\"} 1" )); assert!(body.contains( - "temporal_request_latency_count{operation=\"GetSystemInfo\",service_name=\"temporal-core-sdk\"} 1" + "temporal_request_latency_count{operation=\"GetSystemInfo\",service_name=\"temporal-core-sdk\",global=\"hi!\"} 1" )); if custom_buckets { assert!(body.contains( "temporal_request_latency_bucket{\ - operation=\"GetSystemInfo\",service_name=\"temporal-core-sdk\",le=\"1337\"}" + operation=\"GetSystemInfo\",service_name=\"temporal-core-sdk\",global=\"hi!\",le=\"1337\"}" )); } else if use_seconds_latency { assert!(body.contains( "temporal_request_latency_bucket{\ - operation=\"GetSystemInfo\",service_name=\"temporal-core-sdk\",le=\"0.05\"}" + operation=\"GetSystemInfo\",service_name=\"temporal-core-sdk\",global=\"hi!\",le=\"0.05\"}" )); } else { assert!(body.contains( "temporal_request_latency_bucket{\ - operation=\"GetSystemInfo\",service_name=\"temporal-core-sdk\",le=\"50\"}" + operation=\"GetSystemInfo\",service_name=\"temporal-core-sdk\",global=\"hi!\",le=\"50\"}" )); } // Verify counter names are appropriate (don't end w/ '_total') @@ -125,15 +135,10 @@ async fn prometheus_metrics_exported( // Verify non-temporal metrics meter does not prefix let mm = rt.telemetry().get_metric_meter().unwrap(); let g = mm.inner.gauge(MetricParameters::from("mygauge")); - g.record( - 42, - &MetricAttributes::OTel { - kvs: Arc::new(vec![]), - }, - ); + let attrs = mm.inner.new_attributes(NewAttributes::new(vec![])); + g.record(42, &attrs); let body = get_text(format!("http://{addr}/metrics")).await; - println!("{}", &body); - assert!(body.contains("\nmygauge 42")); + assert!(body.contains("\nmygauge{global=\"hi!\"} 42")); } #[tokio::test] @@ -688,7 +693,7 @@ async fn docker_metrics_with_prometheus( )] otel_collector: (&str, OtlpProtocol), ) { - if std::env::var("DOCKER_PROMETHEUS_RUNNING").is_err() { + if env::var("DOCKER_PROMETHEUS_RUNNING").is_err() { return; } let (otel_collector_addr, otel_protocol) = otel_collector; @@ -730,7 +735,7 @@ async fn docker_metrics_with_prometheus( client.list_namespaces().await.unwrap(); // Give Prometheus time to scrape metrics - tokio::time::sleep(std::time::Duration::from_secs(2)).await; + tokio::time::sleep(Duration::from_secs(2)).await; // Query Prometheus API for metrics let client = reqwest::Client::new(); @@ -767,6 +772,9 @@ async fn activity_metrics() { let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); let wf_name = "activity_metrics"; let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt); + starter + .worker_config + .graceful_shutdown_period(Duration::from_secs(1)); let task_queue = starter.get_task_queue().to_owned(); let mut worker = starter.worker().await; @@ -777,11 +785,6 @@ async fn activity_metrics() { start_to_close_timeout: Some(Duration::from_secs(1)), ..Default::default() }); - let local_act_pass = ctx.local_activity(LocalActivityOptions { - activity_type: "pass_fail_act".to_string(), - input: "pass".as_json_payload().expect("serializes fine"), - ..Default::default() - }); let normal_act_fail = ctx.activity(ActivityOptions { activity_type: "pass_fail_act".to_string(), input: "fail".as_json_payload().expect("serializes fine"), @@ -792,6 +795,12 @@ async fn activity_metrics() { }), ..Default::default() }); + join!(normal_act_pass, normal_act_fail); + let local_act_pass = ctx.local_activity(LocalActivityOptions { + activity_type: "pass_fail_act".to_string(), + input: "pass".as_json_payload().expect("serializes fine"), + ..Default::default() + }); let local_act_fail = ctx.local_activity(LocalActivityOptions { activity_type: "pass_fail_act".to_string(), input: "fail".as_json_payload().expect("serializes fine"), @@ -810,12 +819,8 @@ async fn activity_metrics() { }, ..Default::default() }); - join!( - normal_act_pass, - local_act_pass, - normal_act_fail, - local_act_fail - ); + join!(local_act_pass, local_act_fail); + // TODO: Currently takes a WFT b/c of https://github.com/temporalio/sdk-core/issues/856 local_act_cancel.cancel(&ctx); local_act_cancel.await; Ok(().into()) @@ -824,7 +829,6 @@ async fn activity_metrics() { match i.as_str() { "pass" => Ok("pass"), "cancel" => { - // TODO: Cancel is taking until shutdown to come through :| ctx.cancelled().await; Err(ActivityError::cancelled()) } @@ -1111,6 +1115,7 @@ async fn evict_on_complete_does_not_count_as_forced_eviction() { struct MetricRecordingSlotSupplier { inner: FixedSizeSlotSupplier, + metrics: OnceLock<(Gauge, Gauge, Gauge)>, } #[async_trait::async_trait] @@ -1121,16 +1126,18 @@ where type SlotKind = SK; async fn reserve_slot(&self, ctx: &dyn SlotReservationContext) -> SlotSupplierPermit { - let g = ctx + let (g, _, _) = self.metrics.get_or_init(|| { + let meter = ctx.get_metrics_meter().unwrap(); + let g1 = meter.gauge(MetricParameters::from("custom_reserve")); + let g2 = meter.gauge(MetricParameters::from("custom_mark_used")); + let g3 = meter.gauge(MetricParameters::from("custom_release")); + (g1, g2, g3) + }); + let attrs = ctx .get_metrics_meter() .unwrap() - .gauge(MetricParameters::from("custom_reserve")); - g.record( - 1, - &MetricAttributes::OTel { - kvs: Arc::new(vec![]), - }, - ); + .new_attributes(NewAttributes::new(vec![])); + g.record(1, &attrs); self.inner.reserve_slot(ctx).await } @@ -1139,30 +1146,18 @@ where } fn mark_slot_used(&self, ctx: &dyn SlotMarkUsedContext) { - let g = ctx - .get_metrics_meter() - .unwrap() - .gauge(MetricParameters::from("custom_mark_used")); - g.record( - 1, - &MetricAttributes::OTel { - kvs: Arc::new(vec![]), - }, - ); + let meter = ctx.get_metrics_meter().unwrap(); + let attrs = meter.new_attributes(NewAttributes::new(vec![])); + let (_, g, _) = self.metrics.get().unwrap(); + g.record(1, &attrs); self.inner.mark_slot_used(ctx); } fn release_slot(&self, ctx: &dyn SlotReleaseContext) { - let g = ctx - .get_metrics_meter() - .unwrap() - .gauge(MetricParameters::from("custom_release")); - g.record( - 1, - &MetricAttributes::OTel { - kvs: Arc::new(vec![]), - }, - ); + let meter = ctx.get_metrics_meter().unwrap(); + let attrs = meter.new_attributes(NewAttributes::new(vec![])); + let (_, _, g) = self.metrics.get().unwrap(); + g.record(1, &attrs); self.inner.release_slot(ctx); } @@ -1182,6 +1177,7 @@ async fn metrics_available_from_custom_slot_supplier() { let mut tb = TunerBuilder::default(); tb.workflow_slot_supplier(Arc::new(MetricRecordingSlotSupplier:: { inner: FixedSizeSlotSupplier::new(5), + metrics: OnceLock::new(), })); starter.worker_config.tuner(Arc::new(tb.build())); let mut worker = starter.worker().await; @@ -1202,9 +1198,124 @@ async fn metrics_available_from_custom_slot_supplier() { .unwrap(); worker.run_until_done().await.unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; let body = get_text(format!("http://{addr}/metrics")).await; assert!(body.contains("custom_reserve")); assert!(body.contains("custom_mark_used")); assert!(body.contains("custom_release")); } + +#[tokio::test] +async fn test_prometheus_endpoint_integration() { + let (telemopts, addr, _aborter) = prom_metrics(None); + let meter = telemopts.metrics.unwrap(); + + let counter = meter.counter(MetricParameters { + name: "test_requests_total".into(), + description: "Total number of test requests".into(), + unit: "".into(), + }); + let histogram = meter.histogram(MetricParameters { + name: "test_request_duration_ms".into(), + description: "Duration of test requests in milliseconds".into(), + unit: "ms".into(), + }); + let gauge = meter.gauge(MetricParameters { + name: "test_active_connections".into(), + description: "Number of active test connections".into(), + unit: "".into(), + }); + + counter.adds(5); + histogram.records(100); + gauge.records(10); + + let url = format!("http://{}/metrics", addr); + let response = tokio::time::timeout(Duration::from_secs(10), reqwest::get(&url)) + .await + .expect("Request timed out") + .expect("Request failed"); + + assert!(response.status().is_success()); + + let content_type = response + .headers() + .get("content-type") + .unwrap() + .to_str() + .unwrap(); + assert!(content_type.contains("text/plain")); + + let body = response.text().await.expect("Failed to read response body"); + + assert!(body.contains("test_requests_total"),); + assert!(body.contains("test_request_duration_ms"),); + assert!(body.contains("test_active_connections"),); + assert!(body.contains("test_requests_total 5"),); + assert!(body.contains("test_active_connections 10"),); + assert!(body.contains("test_request_duration_ms_count 1"),); + assert!(body.contains("test_request_duration_ms_sum 100"),); +} + +#[tokio::test] +async fn test_prometheus_metric_format_consistency() { + let (telemopts, addr, _aborter) = prom_metrics(None); + let meter = telemopts.metrics.unwrap(); + + let workflow_counter = meter.counter(MetricParameters { + name: "temporal_workflow_completed_total".into(), + description: "Total number of completed workflows".into(), + unit: "".into(), + }); + let activity_histogram = meter.histogram_duration(MetricParameters { + name: "temporal_activity_execution_latency".into(), + description: "Duration of activity execution".into(), + unit: "ms".into(), + }); + + let attrs = meter.new_attributes(NewAttributes::new(vec![])); + + workflow_counter.add(1, &attrs); + activity_histogram.record(Duration::from_millis(150), &attrs); + + let url = format!("http://{}/metrics", addr); + let response = tokio::time::timeout(Duration::from_secs(10), reqwest::get(&url)) + .await + .expect("Request timed out") + .expect("Request failed"); + + let body = response.text().await.expect("Failed to read response body"); + + assert!(body.contains("# HELP temporal_workflow_completed_total"),); + assert!(body.contains("# TYPE temporal_workflow_completed_total counter"),); + assert!(body.contains("# HELP temporal_activity_execution_latency"),); + assert!(body.contains("# TYPE temporal_activity_execution_latency histogram"),); + assert!(body.contains("temporal_workflow_completed_total 1"),); + assert!(body.contains("temporal_activity_execution_latency_count 1"),); + assert!(body.contains("temporal_activity_execution_latency_bucket"),); + assert!(body.contains("le=\"")); +} + +#[tokio::test] +async fn prometheus_label_nonsense() { + let mut opts_builder = PrometheusExporterOptionsBuilder::default(); + opts_builder.socket_addr(ANY_PORT.parse().unwrap()); + let (telemopts, addr, _aborter) = prom_metrics(Some(opts_builder.build().unwrap())); + let meter = telemopts.metrics.clone().unwrap(); + + let ctr = meter.counter( + MetricParametersBuilder::default() + .name("some_counter") + .build() + .unwrap(), + ); + let a1 = meter.new_attributes(NewAttributes::from([MetricKeyValue::new("thing", "foo")])); + let a2 = meter.new_attributes(NewAttributes::from([MetricKeyValue::new("blerp", "baz")])); + ctr.add(1, &a1); + ctr.add(1, &a2); + ctr.add(1, &a2); + ctr.add(1, &a1); + + let body = get_text(format!("http://{addr}/metrics")).await; + assert!(body.contains("some_counter{thing=\"foo\"} 2")); + assert!(body.contains("some_counter{blerp=\"baz\"} 2")); +} diff --git a/tests/integ_tests/polling_tests.rs b/tests/integ_tests/polling_tests.rs index 4cd89859b..8a8aadccf 100644 --- a/tests/integ_tests/polling_tests.rs +++ b/tests/integ_tests/polling_tests.rs @@ -169,7 +169,7 @@ async fn switching_worker_client_changes_poll() { // Create a worker only on the first server let worker = init_worker( - init_integ_telem(), + init_integ_telem().unwrap(), integ_worker_config("my-task-queue") // We want a cache so we don't get extra remove-job activations .max_cached_workflows(100_usize)