Skip to content

Conversation

cretz
Copy link
Member

@cretz cretz commented Jul 23, 2025

What was changed

  • Added temporal_client::callback_based module with a Tonic-compatible Tower service implementation that invokes a callback instead of making a network call
  • Added connect_no_namespace_with_service_override overload that accepts an optional callback service (and moved stuff from connect_no_namespace into there)
  • Adapted temporal_client::metrics::GrpcMetricSvc to work with channel or callback-based service
  • Added grpc_override_callback option in C bridge's ClientOptions that can be set with a C function for callback
  • Added supporting structures and methods to support C-based callbacks with a careful eye towards lifetimes
  • Added minimal test to confirm some behaviors

Missing/future features:

@cretz cretz requested a review from a team as a code owner July 23, 2025 21:44
impl Service<http::Request<Body>> for GrpcMetricSvc {
type Response = http::Response<Body>;
type Error = tonic::transport::Error;
type Error = Box<dyn std::error::Error + Send + Sync>;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not believe changing this will cause any issues or serious performance concerns, but would like to have it double checked

Copy link
Member

@Sushisource Sushisource left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general looking good!

let req = GrpcRequest {
service: path_parts.next().unwrap_or_default(),
rpc: path_parts.next().unwrap_or_default(),
headers: &parts.headers,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why manipulate the body at all? If we pass through the compression flag and length, we can document that and allow the callback implementer to handle compression if they want

Copy link
Member Author

@cretz cretz Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because most gRPC clients deal in protos, not full bodies with the extra 5 bytes in front. For callback implementers that are, say, delegating to their own in-language clients, those clients operate on protobuf bodies, not raw HTTP ones. I don't think we should ask them to carve up the body to get the proto out (or put it back). If there is a use case for needing to know the compression byte, we can add it, but we are in control of the client call so it will always be what we want anyways.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, what I'm saying is the callback client isn't really a pure gRPC client, because it may want to delegate to something that implements compression too. So seems to me we should just document that and leave it to the caller.

Copy link
Member Author

@cretz cretz Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They can't enable compression, we'd have to with our Tonic client which is what builds this body. But there's no value for us to do so in this case since it's all in memory. If they want to delegate to something that implements compression, they can/should. They can wrap the whole proto bytes in pre-negotiated compression if they'd like. But from our in-memory perspective, we give them proto bytes, how that's represented on the in-memory wire via the few bits of Rust code between Tonic and this callback should have no user effect. It's up to us and it should always be 0 (no compression) IMO. The compression byte is about pre-negotiated compression algorithm with the server, which doesn't apply to in-memory nor should it.

Overall, this compression is just a boolean used by gRPC HTTP, but has no value for in-memory representation and will always be 0. It doesn't compress the bytes or even tell you the algorithm, it's just a note saying the negotiated compression with the server is in effect (there is no server here). This is unrelated to whether a user wants to use compression to their upstream implementation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaa, ok that makes sense

Comment on lines +170 to +176
// We have to cast this to a literal pointer integer because we use spawn_blocking
// and Rust can't validate things in either of two approaches. The first approach,
// just moving the *mut to spawn_blocking closure, will not work because it is not
// send (even if you wrap it in a marked-send struct). The second, approach, moving
// the box to the closure and into_raw'ing it there won't work because Rust thinks
// the "req" param to spawn_blocking may outlive this closure even though we're
// confident in our oneshot use this will never happen.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me. AFAIK there is no safe way to express this (if you want to keep using spawn_blocking).

Because spawn_blocking by definition requires the possibility of using another thread, the Send requirement exists. Raw pointers are never send. The lifetime issue is also unsolvable: https://without.boats/blog/the-scoped-task-trilemma/

This explanation is great, but I also like a quick summary of // SAFETY: This is safe because the spawned task is guaranteed to be joined, and the box reclaimed, before this function exits

However, writing that - is it actually safe in the error case? Seems like we might need to double check there that the user did call the response callback, and free the pointer if they didn't.

Copy link
Member Author

@cretz cretz Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raw pointers are never send

Rustinomicon says you can cheat this (https://doc.rust-lang.org/nomicon/send-and-sync.html), but it still was not working for me when I tried due to other issues (but I have cheated like this before).

However, writing that - is it actually safe in the error case? Seems like we might need to double check there that the user did call the response callback, and free the pointer if they didn't.

If they didn't call the respond call, receiver.await never completes. And note, the only time the sender can ever be dropped is also in that same respond call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right but the error can get returned before that await point

Copy link
Member Author

@cretz cretz Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, so from my reading, in this situation spawn_blocking may return an error not caused by the user callback code panicking when 1) shutting down tokio runtime (is_cancelled) or 2) tokio cannot schedule the thread (a form of panic). Both should be rare, but I suppose technically possible. I will look into making an atomic free call on the error return there.

EDIT: Actually, I'm struggling to know whether it reached user code or not. We definitely don't want to free if it did right? What if user callback did the respond and then panicked (e.g. threw exception from their lang)? This would be the second free attempt but the memory would already be invalid so you can't check whether freed before. I can have some kind of send/arc bool I guess that I set just before invoking user callback so we know their code is responsible for freeing now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well that's the thing is you can't necessarily know (I agree it's very rare though). Having some bool flag that gets set when the response is called is what I was thinking.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, I am still planning on doing this before I merge.

Copy link
Member Author

@cretz cretz Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code update. Now if user code is not run, we drop this ourselves on spawn_blocking error (the only case where user code wouldn't run)

Copy link
Member Author

@cretz cretz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, I am holding off on merging while I wait a bit longer on others interested to confirm this works for them.

Comment on lines +170 to +176
// We have to cast this to a literal pointer integer because we use spawn_blocking
// and Rust can't validate things in either of two approaches. The first approach,
// just moving the *mut to spawn_blocking closure, will not work because it is not
// send (even if you wrap it in a marked-send struct). The second, approach, moving
// the box to the closure and into_raw'ing it there won't work because Rust thinks
// the "req" param to spawn_blocking may outlive this closure even though we're
// confident in our oneshot use this will never happen.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, I am still planning on doing this before I merge.

@cretz
Copy link
Member Author

cretz commented Aug 14, 2025

Changed gRPC request to just copy the service and RPC strings instead of the lifetime dance of keeping a &str reference around. The cost is negligible (we were already collecting the entire body off the stream and we are still moving the header map).

@cretz
Copy link
Member Author

cretz commented Aug 15, 2025

Added user data and some minor doc stuff. Waiting for external user confirmation of behavior before merging.

@cretz
Copy link
Member Author

cretz commented Aug 18, 2025

User confirmed all good. Waiting on #971 which will have some CI fixes.

@cretz cretz merged commit 871b320 into master Aug 18, 2025
29 of 31 checks passed
@cretz cretz deleted the interceptable-client branch August 18, 2025 19:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants