Skip to content

💥 Replace OTel Prometheus Exporter #942

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ src/protos/*.rs
# Keep secrets here
/.cloud_certs/
cloud_envs.fish
/.claude/settings.local.json
6 changes: 6 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ document as your quick reference when submitting pull requests.
- `tests/` – integration, heavy, and manual tests
- `arch_docs/` – architectural design documents
- Contributor guide: `README.md`
- `target/` - This contains compiled files. You never need to look in here.

## Repo Specific Utilities

Expand All @@ -38,6 +39,9 @@ cargo integ-test # integration tests (starts ephemeral server by default)
cargo test --test heavy_tests # load tests -- agents do not need to run this and should not
```

Rust compilation can take some time. Do not interrupt builds or tests unless they are taking more
than 10 minutes.

Additional checks:

```bash
Expand All @@ -61,6 +65,8 @@ Documentation can be generated with `cargo doc`.
Reviewers will look for:

- All builds, tests, and lints passing in CI
- Note that some tests cause intentional panics. That does not mean the test failed. You should
only consider tests that have failed according to the harness to be a real problem.
- New tests covering behavior changes
- Clear and concise code following existing style (see `README.md` for error handling guidance)
- Documentation updates for any public API changes
Expand Down
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ license-file = "LICENSE.txt"

[workspace.dependencies]
derive_builder = "0.20"
derive_more = { version = "2.0", features = ["constructor", "display", "from", "into", "debug"] }
derive_more = { version = "2.0", features = ["constructor", "display", "from", "into", "debug", "try_into"] }
thiserror = "2"
tonic = "0.12"
tonic-build = "0.12"
opentelemetry = { version = "0.29", features = ["metrics"] }
tonic = "0.13"
tonic-build = "0.13"
opentelemetry = { version = "0.30", features = ["metrics"] }
prost = "0.13"
prost-types = "0.13"

Expand Down
5 changes: 3 additions & 2 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ backoff = "0.4"
base64 = "0.22"
derive_builder = { workspace = true }
derive_more = { workspace = true }
bytes = "1.10"
futures-util = { version = "0.3", default-features = false }
futures-retry = "0.6.0"
http = "1.1.0"
http = "1.1"
http-body-util = "0.1"
hyper = { version = "1.4.1" }
hyper-util = "0.1.6"
Expand All @@ -31,7 +32,7 @@ parking_lot = "0.12"
slotmap = "1.0"
thiserror = { workspace = true }
tokio = "1.1"
tonic = { workspace = true, features = ["tls", "tls-roots"] }
tonic = { workspace = true, features = ["tls-ring", "tls-native-roots"] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to the changes for this PR? Any concerns changing this?

Copy link
Member Author

@Sushisource Sushisource Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's part of the reason for all this, so that we could upgrade Tonic. The new feature flags should be the exact equivalent of what it was previously, they just got renamed

tower = { version = "0.5", features = ["util"] }
tracing = "0.1"
url = "2.2"
Expand Down
23 changes: 16 additions & 7 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use temporal_sdk_core_protos::{
};
use tonic::{
Code,
body::BoxBody,
body::Body,
client::GrpcService,
codegen::InterceptedService,
metadata::{MetadataKey, MetadataMap, MetadataValue},
Expand Down Expand Up @@ -595,7 +595,7 @@ fn get_decode_max_size() -> usize {
impl<T> TemporalServiceClient<T>
where
T: Clone,
T: GrpcService<BoxBody> + Send + Clone + 'static,
T: GrpcService<Body> + Send + Clone + 'static,
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
T::Error: Into<tonic::codegen::StdError>,
<T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
Expand Down Expand Up @@ -1175,13 +1175,13 @@ impl From<common::v1::Priority> for Priority {
impl<T> WorkflowClientTrait for T
where
T: RawClientLike + NamespacedClient + Clone + Send + Sync + 'static,
<Self as RawClientLike>::SvcType: GrpcService<BoxBody> + Send + Clone + 'static,
<<Self as RawClientLike>::SvcType as GrpcService<BoxBody>>::ResponseBody:
<Self as RawClientLike>::SvcType: GrpcService<Body> + Send + Clone + 'static,
<<Self as RawClientLike>::SvcType as GrpcService<Body>>::ResponseBody:
tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
<<Self as RawClientLike>::SvcType as GrpcService<BoxBody>>::Error:
<<Self as RawClientLike>::SvcType as GrpcService<Body>>::Error:
Into<tonic::codegen::StdError>,
<<Self as RawClientLike>::SvcType as GrpcService<BoxBody>>::Future: Send,
<<<Self as RawClientLike>::SvcType as GrpcService<BoxBody>>::ResponseBody
<<Self as RawClientLike>::SvcType as GrpcService<Body>>::Future: Send,
<<<Self as RawClientLike>::SvcType as GrpcService<Body>>::ResponseBody
as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
{
async fn start_workflow(
Expand Down Expand Up @@ -1673,6 +1673,15 @@ impl<T> RequestExt for tonic::Request<T> {
}
}

macro_rules! dbg_panic {
($($arg:tt)*) => {
use tracing::error;
error!($($arg)*);
debug_assert!(false, $($arg)*);
};
}
pub(crate) use dbg_panic;

#[cfg(test)]
mod tests {
use super::*;
Expand Down
80 changes: 57 additions & 23 deletions client/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use crate::{AttachMetricLabels, CallType};
use crate::{AttachMetricLabels, CallType, dbg_panic};
use futures_util::{FutureExt, future::BoxFuture};
use std::{
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
use temporal_sdk_core_api::telemetry::metrics::{
CoreMeter, Counter, HistogramDuration, MetricAttributes, MetricKeyValue, MetricParameters,
TemporalMeter,
CoreMeter, Counter, CounterBase, HistogramDuration, HistogramDurationBase, MetricAttributable,
MetricAttributes, MetricKeyValue, MetricParameters, TemporalMeter,
};
use tonic::{Code, body::BoxBody, transport::Channel};
use tonic::{Code, body::Body, transport::Channel};
use tower::Service;

/// The string name (which may be prefixed) for this metric
Expand All @@ -26,22 +26,24 @@ pub(crate) struct MetricsContext {
meter: Arc<dyn CoreMeter>,
kvs: MetricAttributes,
poll_is_long: bool,
instruments: Instruments,
}
#[derive(Clone)]
struct Instruments {
svc_request: Counter,
svc_request_failed: Counter,
long_svc_request: Counter,
long_svc_request_failed: Counter,

svc_request: Arc<dyn Counter>,
svc_request_failed: Arc<dyn Counter>,
long_svc_request: Arc<dyn Counter>,
long_svc_request_failed: Arc<dyn Counter>,

svc_request_latency: Arc<dyn HistogramDuration>,
long_svc_request_latency: Arc<dyn HistogramDuration>,
svc_request_latency: HistogramDuration,
long_svc_request_latency: HistogramDuration,
}

impl MetricsContext {
pub(crate) fn new(tm: TemporalMeter) -> Self {
let meter = tm.inner;
Self {
kvs: meter.new_attributes(tm.default_attribs),
poll_is_long: false,
let kvs = meter.new_attributes(tm.default_attribs);
let instruments = Instruments {
svc_request: meter.counter(MetricParameters {
name: "request".into(),
description: "Count of client request successes by rpc name".into(),
Expand Down Expand Up @@ -72,6 +74,11 @@ impl MetricsContext {
unit: "duration".into(),
description: "Histogram of client long-poll request latencies".into(),
}),
};
Self {
kvs,
poll_is_long: false,
instruments,
meter,
}
}
Expand All @@ -81,6 +88,33 @@ impl MetricsContext {
self.kvs = self
.meter
.extend_attributes(self.kvs.clone(), new_kvs.into());

let _ = self
.instruments
.svc_request
.with_attributes(&self.kvs)
.and_then(|v| {
self.instruments.svc_request = v;
self.instruments.long_svc_request.with_attributes(&self.kvs)
})
.and_then(|v| {
self.instruments.long_svc_request = v;
self.instruments
.svc_request_latency
.with_attributes(&self.kvs)
})
.and_then(|v| {
self.instruments.svc_request_latency = v;
self.instruments
.long_svc_request_latency
.with_attributes(&self.kvs)
})
.map(|v| {
self.instruments.long_svc_request_latency = v;
})
.inspect_err(|e| {
dbg_panic!("Failed to extend client metrics attributes: {:?}", e);
});
}

pub(crate) fn set_is_long_poll(&mut self) {
Expand All @@ -90,9 +124,9 @@ impl MetricsContext {
/// A request to the temporal service was made
pub(crate) fn svc_request(&self) {
if self.poll_is_long {
self.long_svc_request.add(1, &self.kvs);
self.instruments.long_svc_request.adds(1);
} else {
self.svc_request.add(1, &self.kvs);
self.instruments.svc_request.adds(1);
}
}

Expand All @@ -108,18 +142,18 @@ impl MetricsContext {
&self.kvs
};
if self.poll_is_long {
self.long_svc_request_failed.add(1, kvs);
self.instruments.long_svc_request_failed.add(1, kvs);
} else {
self.svc_request_failed.add(1, kvs);
self.instruments.svc_request_failed.add(1, kvs);
}
}

/// Record service request latency
pub(crate) fn record_svc_req_latency(&self, dur: Duration) {
if self.poll_is_long {
self.long_svc_request_latency.record(dur, &self.kvs);
self.instruments.long_svc_request_latency.records(dur);
} else {
self.svc_request_latency.record(dur, &self.kvs);
self.instruments.svc_request_latency.records(dur);
}
}
}
Expand Down Expand Up @@ -177,16 +211,16 @@ pub struct GrpcMetricSvc {
pub(crate) disable_errcode_label: bool,
}

impl Service<http::Request<BoxBody>> for GrpcMetricSvc {
type Response = http::Response<BoxBody>;
impl Service<http::Request<Body>> for GrpcMetricSvc {
type Response = http::Response<Body>;
type Error = tonic::transport::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, mut req: http::Request<BoxBody>) -> Self::Future {
fn call(&mut self, mut req: http::Request<Body>) -> Self::Future {
let metrics = self
.metrics
.clone()
Expand Down
26 changes: 13 additions & 13 deletions client/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use temporal_sdk_core_protos::{
};
use tonic::{
Request, Response, Status,
body::BoxBody,
body::Body,
client::GrpcService,
metadata::{AsciiMetadataValue, KeyAndValueRef},
};
Expand Down Expand Up @@ -166,7 +166,7 @@ where
impl<T> RawClientLike for TemporalServiceClient<T>
where
T: Send + Sync + Clone + 'static,
T: GrpcService<BoxBody> + Send + Clone + 'static,
T: GrpcService<Body> + Send + Clone + 'static,
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
T::Error: Into<tonic::codegen::StdError>,
<T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
Expand Down Expand Up @@ -221,7 +221,7 @@ where
impl<T> RawClientLike for ConfiguredClient<TemporalServiceClient<T>>
where
T: Send + Sync + Clone + 'static,
T: GrpcService<BoxBody> + Send + Clone + 'static,
T: GrpcService<Body> + Send + Clone + 'static,
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
T::Error: Into<tonic::codegen::StdError>,
<T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
Expand Down Expand Up @@ -373,7 +373,7 @@ pub(super) struct IsUserLongPoll;
impl<RC, T> WorkflowService for RC
where
RC: RawClientLike<SvcType = T>,
T: GrpcService<BoxBody> + Send + Clone + 'static,
T: GrpcService<Body> + Send + Clone + 'static,
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
T::Error: Into<tonic::codegen::StdError>,
T::Future: Send,
Expand All @@ -383,7 +383,7 @@ where
impl<RC, T> OperatorService for RC
where
RC: RawClientLike<SvcType = T>,
T: GrpcService<BoxBody> + Send + Clone + 'static,
T: GrpcService<Body> + Send + Clone + 'static,
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
T::Error: Into<tonic::codegen::StdError>,
T::Future: Send,
Expand All @@ -393,7 +393,7 @@ where
impl<RC, T> CloudService for RC
where
RC: RawClientLike<SvcType = T>,
T: GrpcService<BoxBody> + Send + Clone + 'static,
T: GrpcService<Body> + Send + Clone + 'static,
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
T::Error: Into<tonic::codegen::StdError>,
T::Future: Send,
Expand All @@ -403,7 +403,7 @@ where
impl<RC, T> TestService for RC
where
RC: RawClientLike<SvcType = T>,
T: GrpcService<BoxBody> + Send + Clone + 'static,
T: GrpcService<Body> + Send + Clone + 'static,
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
T::Error: Into<tonic::codegen::StdError>,
T::Future: Send,
Expand All @@ -413,7 +413,7 @@ where
impl<RC, T> HealthService for RC
where
RC: RawClientLike<SvcType = T>,
T: GrpcService<BoxBody> + Send + Clone + 'static,
T: GrpcService<Body> + Send + Clone + 'static,
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
T::Error: Into<tonic::codegen::StdError>,
T::Future: Send,
Expand Down Expand Up @@ -483,13 +483,13 @@ macro_rules! proxier {
pub trait $trait_name: RawClientLike
where
// Yo this is wild
<Self as RawClientLike>::SvcType: GrpcService<BoxBody> + Send + Clone + 'static,
<<Self as RawClientLike>::SvcType as GrpcService<BoxBody>>::ResponseBody:
<Self as RawClientLike>::SvcType: GrpcService<Body> + Send + Clone + 'static,
<<Self as RawClientLike>::SvcType as GrpcService<Body>>::ResponseBody:
tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
<<Self as RawClientLike>::SvcType as GrpcService<BoxBody>>::Error:
<<Self as RawClientLike>::SvcType as GrpcService<Body>>::Error:
Into<tonic::codegen::StdError>,
<<Self as RawClientLike>::SvcType as GrpcService<BoxBody>>::Future: Send,
<<<Self as RawClientLike>::SvcType as GrpcService<BoxBody>>::ResponseBody
<<Self as RawClientLike>::SvcType as GrpcService<Body>>::Future: Send,
<<<Self as RawClientLike>::SvcType as GrpcService<Body>>::ResponseBody
as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
{
$(
Expand Down
4 changes: 2 additions & 2 deletions core-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ categories = ["development-tools"]

[features]
otel_impls = ["dep:opentelemetry"]
envconfig = ["dep:toml", "dep:serde", "dep:dirs", "dep:anyhow"]
envconfig = ["dep:toml", "dep:serde", "dep:dirs"]

[dependencies]
anyhow = { version = "1.0", optional = true }
async-trait = "0.1"
dirs = { version = "5.0", optional = true }
derive_builder = { workspace = true }
Expand All @@ -29,6 +28,7 @@ serde_json = "1.0"
thiserror = { workspace = true }
toml = { version = "0.8", optional = true }
tonic = { workspace = true }
tracing = "0.1"
tracing-core = "0.1"
url = "2.3"

Expand Down
Loading
Loading