Skip to content

[DRAFT] Process wide worker heartbeat #962

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

yuandrew
Copy link
Contributor

@yuandrew yuandrew commented Jul 23, 2025

What was changed

Why?

Checklist

  1. Closes

  2. How was this tested:

  1. Any docs updates needed?

core/src/lib.rs Outdated
if runtime.heartbeat_worker.get().is_none() {
let process_key = Uuid::new_v4();
// TODO: set max_concurrent_nexus_polls to 1?
// let nexus_config = WorkerConfig {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we want to have a separate config for this worker, or do we want to group them with the existing WorkerConfig? But what if different workers on the same namespace have 2 different configs?

Also, not sure what config options need to be unique here, I'm thinking potentially max_concurrent_nexus_polls, and also heartbeat_interval at least?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, yeah, the config for these workers should just be entirely in our control, and not derived from some of the other worker's configs.

For example, if you look at how we initialize replay workers, we set a lot of values to 1 or disable things, same story here really.

Off the top of my head we probably want:

  • Disable workflow and activity polling - this will happen by just not calling the poll APIs, but we can also explicitly set no_remote_activities: true.
  • Probably use poller autoscaling for the nexus poller with minimum/initial set to 1 and max set fairly low like 10.
  • Fixed size nexus slots. Honestly not sure what the value should be here, but we can start with something low to begin with.

Pretty much everything else can just be default or off or something, whatever makes sense.

The only option that needs to be derived from an existing worker here is the newly-added heartbeat_interval. The problem is it could be conflicting. So, we can just always pick the smallest value among workers sharing the same heartbeat. That should be added to the docstring.

}
_ = reset_notify.notified() => {
ticker.reset();
}
// TODO: handle nexus tasks
res = manager.next_nexus_task() => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently working with Yuri on this

core/src/lib.rs Outdated
@@ -218,6 +263,9 @@ pub struct CoreRuntime {
telemetry: TelemetryInstance,
runtime: Option<tokio::runtime::Runtime>,
runtime_handle: tokio::runtime::Handle,
heartbeat_worker: OnceLock<Worker>,
heartbeat_fn_map: Arc<Mutex<HeartbeatMap>>,
process_key: Uuid,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably this name isn't so great any more now that the grouping isn't necessarily per-process.

Honestly we could just call it task_queue_key since ultimately that's what it is. We'll want to change it in the API too.

Comment on lines 501 to 502
// Process-wide nexus worker
let worker_heartbeat = if let Some(ref details) = heartbeat_details {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The worker itself should now no longer need a heartbeat manager at all, like we talked about on the call, the only variant of WorkerHeartbeatDetails should be the callback (and then it'll just go away).

In fact, I don't think you need to pass in anything to the Worker at all. Workers can just expose a pub(crate) fn capture_heartbeat_details and when you are registering a new worker with the shared heartbeat manager, you just pass in a callback that calls that.

}
}

pub(crate) struct WorkerHeartbeatManager {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this goes away after the other changes? All we really need is the map on the runtime, or the shared worker on a client.

Comment on lines 37 to 38
#[builder(default = "Arc::new(AtomicUsize::new(0))")]
pub max_cached_workflows: Arc<AtomicUsize>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Arc shouldn't be in the config itself - users (lang) shouldn't need to create an Arc or know about it.

Rather, just use it everywhere someone was referencing config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AtomicUsize doesn't implement Clone, which WorkerConfig derives

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I mean the config itself shouldn't change at all. It should stay a normal usize. Then you create the arc'd atomic based on it in init.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahhh, yeah that makes sense. ty

)
}

pub(crate) fn mock_worker_with_heartbeat(mock: MockWorkerClient, config: WorkerConfig) -> Worker {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used rn, will use for the test i need to fix up, feel free to ignore for now

@yuandrew
Copy link
Contributor Author

yuandrew commented Aug 5, 2025

New changes with latest change:

  1. instead of using heartbeat_callback, now passing in WorkerHeartbeatData to SharedNamespaceWorker to both allow it to collect heartbeat data, as well as give SharedNamespaceWorker access to the config fields that server tells it to change. (I'm thinking later on can use traits to limit the use of WorkerHeartbeatData to what it needs.
  2. a new WorkerConfigInner that mirrors WorkerConfig, but keeps atomic values for worker commands (rn only max_cached_workflows, but gives us the ability to onboard other config settings later)
  3. For shutdown, when a worker is registered to SharedNamespaceWorker, a callback that removes itself from its SharedNamespaceWorker entry is given to itself, so it can remove itself from the parent map. Same goes for when SharedNamespaceWorker shuts down and the CoreRuntime map

@yuandrew yuandrew requested a review from Sushisource August 5, 2025 23:07
Comment on lines +135 to +138
/// Mirrors `WorkerConfig`, but with atomic structs to allow Worker Commands to make config changes
/// from the server
#[derive(Clone)]
pub(crate) struct WorkerConfigInner {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My first reaction was that I didn't like this at all, but I get why you did it. That said I think we can accomplish the same goal with less repetition.

I think: keep the "new/mutable" fields at the top like you've done here, and then have an original_config field which is an Arc<WorkerConfig> (Arc purely to make copying cheaper). Docstring can make it clear that the contents in there can't change (one rub might be the Tuners, if/when we make that remotely changeable).

Comment on lines +83 to +84
features = ["history_builders", "serde_serialize"]
#features = ["history_builders"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't need to have changed I think

Comment on lines +33 to +34
pub(crate) type HeartbeatCallback = Arc<dyn Fn() -> WorkerHeartbeat + Send + Sync>;
pub(crate) type WorkerDataMap = HashMap<String, Arc<Mutex<WorkerHeartbeatData>>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short docstrings would be good on these

Comment on lines +43 to +45
/// SharedNamespaceWorker is responsible for polling worker commands and sending worker heartbeat
/// to the server. This communicates with all workers in the same process that share the same
/// namespace.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// SharedNamespaceWorker is responsible for polling worker commands and sending worker heartbeat
/// to the server. This communicates with all workers in the same process that share the same
/// namespace.
/// SharedNamespaceWorker is responsible for polling nexus-delivered worker commands and sending worker heartbeats
/// to the server. This communicates with all workers in the same process that share the same
/// namespace.

Comment on lines +281 to +283
// Worker commands
workflow_cache_size: Arc<AtomicUsize>,
workflow_poller_behavior: PollerBehavior,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these in the heartbeat data?

}
}

fn fetch_config(&self) -> fetch_worker_config_response::WorkerConfigEntry {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this belongs on data. Semantically this is more like another callback.

@@ -121,8 +126,206 @@ pub struct Worker {
local_activities_complete: Arc<AtomicBool>,
/// Used to track all permits have been released
all_permits_tracker: tokio::sync::Mutex<AllPermitsTracker>,
/// Used to shutdown the worker heartbeat task
worker_heartbeat: Option<WorkerHeartbeatManager>,
worker_heartbeat_data: Option<Arc<Mutex<WorkerHeartbeatData>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar issue here - WorkerHeartbeatData is mixing a lot of concerns. Leave data as just data, and separate out the behavior. My comments on capture_heartbeat / fetch_config get at that.

Like I had in my comment on the last review round - we can just have get_worker_heartbeat_data(), which you do have, but we don't need to actually store data - you can just construct a new one every time it's called.

}
}

// TODO: rename
// TODO: impl trait so entire struct doesn't need to be passed to Worker and SharedNamespaceWorker
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be necessary anyway, per my other comments.

@@ -404,13 +621,15 @@ impl Worker {
};

let np_metrics = metrics.with_new_attrs([nexus_poller()]);
// This starts the poller thread.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really a thread - tasks. But, kind of an uninteresting comment anyway

Comment on lines +39 to +40
/// there will need to be some associated refactoring. // TODO: sounds like we'll need to do this
max_permits: Option<Arc<AtomicUsize>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially, yes. Let's leave proper implementations of the commands for later though, since this PR is already quite large, and they won't be working server side for a while anyway.

@yuandrew
Copy link
Contributor Author

Closing this PR in favor separating this out into a process-wide heartbeat PR, and a worker commands PR that will come in the future.

@yuandrew yuandrew closed this Aug 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants