Skip to content

[DRAFT] Process wide worker heartbeat #962

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1153,12 +1153,50 @@ pub struct Priority {
/// The default priority is (min+max)/2. With the default max of 5 and min of
/// 1, that comes out to 3.
pub priority_key: u32,

/// Fairness key is a short string that's used as a key for a fairness
/// balancing mechanism. It may correspond to a tenant id, or to a fixed
/// string like "high" or "low". The default is the empty string.
///
/// The fairness mechanism attempts to dispatch tasks for a given key in
/// proportion to its weight. For example, using a thousand distinct tenant
/// ids, each with a weight of 1.0 (the default) will result in each tenant
/// getting a roughly equal share of task dispatch throughput.
///
/// (Note: this does not imply equal share of worker capacity! Fairness
/// decisions are made based on queue statistics, not
/// current worker load.)
///
/// As another example, using keys "high" and "low" with weight 9.0 and 1.0
/// respectively will prefer dispatching "high" tasks over "low" tasks at a
/// 9:1 ratio, while allowing either key to use all worker capacity if the
/// other is not present.
///
/// All fairness mechanisms, including rate limits, are best-effort and
/// probabilistic. The results may not match what a "perfect" algorithm with
/// infinite resources would produce. The more unique keys are used, the less
/// accurate the results will be.
///
/// Fairness keys are limited to 64 bytes.
pub fairness_key: String,

/// Fairness weight for a task can come from multiple sources for
/// flexibility. From highest to lowest precedence:
/// 1. Weights for a small set of keys can be overridden in task queue
/// configuration with an API.
/// 2. It can be attached to the workflow/activity in this field.
/// 3. The default weight of 1.0 will be used.
///
/// Weight values are clamped to the range [0.001, 1000].
pub fairness_weight: u32, // TODO: f32 with eq
}

impl From<Priority> for common::v1::Priority {
fn from(priority: Priority) -> Self {
common::v1::Priority {
priority_key: priority.priority_key as i32,
fairness_key: priority.fairness_key,
fairness_weight: priority.fairness_weight as f32,
}
}
}
Expand All @@ -1167,6 +1205,8 @@ impl From<common::v1::Priority> for Priority {
fn from(priority: common::v1::Priority) -> Self {
Self {
priority_key: priority.priority_key as u32,
fairness_key: priority.fairness_key,
fairness_weight: priority.fairness_weight as u32,
}
}
}
Expand Down
15 changes: 7 additions & 8 deletions core-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,8 @@ pub mod errors;
pub mod telemetry;
pub mod worker;

use crate::{
errors::{
CompleteActivityError, CompleteNexusError, CompleteWfError, PollError,
WorkerValidationError,
},
worker::WorkerConfig,
use crate::errors::{
CompleteActivityError, CompleteNexusError, CompleteWfError, PollError, WorkerValidationError,
};
use temporal_sdk_core_protos::coresdk::{
ActivityHeartbeat, ActivityTaskCompletion,
Expand Down Expand Up @@ -110,8 +106,11 @@ pub trait Worker: Send + Sync {
/// a warning.
fn request_workflow_eviction(&self, run_id: &str);

/// Return this worker's config
fn get_config(&self) -> &WorkerConfig;
/// Return this worker's task queue
fn get_task_queue(&self) -> String;

/// Return this worker's namespace
fn get_namespace(&self) -> String;

/// Initiate shutdown. See [Worker::shutdown], this is just a sync version that starts the
/// process. You can then wait on `shutdown` or [Worker::finalize_shutdown].
Expand Down
6 changes: 0 additions & 6 deletions core-api/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,6 @@ pub struct WorkerConfig {

/// A versioning strategy for this worker.
pub versioning_strategy: WorkerVersioningStrategy,

/// The interval within which the worker will send a heartbeat.
/// The timer is reset on each existing RPC call that also happens to send this data, like
/// `PollWorkflowTaskQueueRequest`.
#[builder(default = "Duration::from_secs(60)")]
pub heartbeat_interval: Duration,
}

impl WorkerConfig {
Expand Down
1 change: 1 addition & 0 deletions core-c-bridge/include/temporal-sdk-core-c-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ typedef struct TemporalCoreTelemetryOptions {

typedef struct TemporalCoreRuntimeOptions {
const struct TemporalCoreTelemetryOptions *telemetry;
uint64_t heartbeat_duration_millis;
} TemporalCoreRuntimeOptions;

typedef struct TemporalCoreTestServerOptions {
Expand Down
10 changes: 8 additions & 2 deletions core-c-bridge/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::UNIX_EPOCH;
use temporal_sdk_core::CoreRuntime;
use temporal_sdk_core::RuntimeOptions as CoreRuntimeOptions;
use temporal_sdk_core::TokioRuntimeBuilder;
use temporal_sdk_core::telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter};
use temporal_sdk_core_api::telemetry::HistogramBucketOverrides;
Expand All @@ -31,6 +32,7 @@ use url::Url;
#[repr(C)]
pub struct RuntimeOptions {
pub telemetry: *const TelemetryOptions,
pub heartbeat_duration_millis: u64,
}

#[repr(C)]
Expand Down Expand Up @@ -143,7 +145,7 @@ pub extern "C" fn temporal_core_runtime_new(options: *const RuntimeOptions) -> R
let mut runtime = Runtime {
core: Arc::new(
CoreRuntime::new(
CoreTelemetryOptions::default(),
CoreRuntimeOptions::default(),
TokioRuntimeBuilder::default(),
)
.unwrap(),
Expand Down Expand Up @@ -238,9 +240,13 @@ impl Runtime {
} else {
CoreTelemetryOptions::default()
};
let core_runtime_options = CoreRuntimeOptions::new(
telemetry_options,
Some(Duration::from_millis(options.heartbeat_duration_millis)),
);

// Build core runtime
let mut core = CoreRuntime::new(telemetry_options, TokioRuntimeBuilder::default())?;
let mut core = CoreRuntime::new(core_runtime_options, TokioRuntimeBuilder::default())?;

// We late-bind the metrics after core runtime is created since it needs
// the Tokio handle
Expand Down
1 change: 1 addition & 0 deletions core-c-bridge/src/tests/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl Context {

let RuntimeOrFail { runtime, fail } = temporal_core_runtime_new(&RuntimeOptions {
telemetry: std::ptr::null(),
heartbeat_duration_millis: 0,
});

if let Some(fail) = byte_array_to_string(runtime, fail) {
Expand Down
3 changes: 2 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ features = ["otel_impls"]

[dependencies.temporal-sdk-core-protos]
path = "../sdk-core-protos"
features = ["history_builders"]
features = ["history_builders", "serde_serialize"]
#features = ["history_builders"]
Comment on lines +83 to +84
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't need to have changed I think


[dependencies.temporal-client]
path = "../client"
Expand Down
5 changes: 3 additions & 2 deletions core/benches/workflow_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
time::Duration,
};
use temporal_sdk::{WfContext, WorkflowFunction};
use temporal_sdk_core::{CoreRuntime, replay::HistoryForReplay};
use temporal_sdk_core::{CoreRuntime, RuntimeOptions, replay::HistoryForReplay};
use temporal_sdk_core_api::telemetry::metrics::{
MetricKeyValue, MetricParametersBuilder, NewAttributes,
};
Expand Down Expand Up @@ -75,7 +75,8 @@ pub fn bench_metrics(c: &mut Criterion) {
let _tokio = tokio_runtime.enter();
let (mut telemopts, addr, _aborter) = prom_metrics(None);
telemopts.logging = None;
let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap();
let runtimeopts = RuntimeOptions::new(telemopts, None);
let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap();
let meter = rt.telemetry().get_metric_meter().unwrap();

c.bench_function("Record with new attributes on each call", move |b| {
Expand Down
16 changes: 8 additions & 8 deletions core/src/abstractions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ pub(crate) struct MeteredPermitDealer<SK: SlotKind> {
/// is at this number, no more permits will be requested from the supplier until one is freed.
/// This avoids requesting slots when we are at the workflow cache size limit. If and when
/// we add user-defined cache sizing, that logic will need to live with the supplier and
/// there will need to be some associated refactoring.
max_permits: Option<usize>,
/// there will need to be some associated refactoring. // TODO: sounds like we'll need to do this
max_permits: Option<Arc<AtomicUsize>>,
Comment on lines +39 to +40
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially, yes. Let's leave proper implementations of the commands for later though, since this PR is already quite large, and they won't be working server side for a while anyway.

metrics_ctx: MetricsContext,
meter: Option<TemporalMeter>,
/// Only applies to permit dealers for workflow tasks. True if this permit dealer is associated
Expand All @@ -61,7 +61,7 @@ where
pub(crate) fn new(
supplier: Arc<dyn SlotSupplier<SlotKind = SK> + Send + Sync>,
metrics_ctx: MetricsContext,
max_permits: Option<usize>,
max_permits: Option<Arc<AtomicUsize>>,
context_data: Arc<PermitDealerContextData>,
meter: Option<TemporalMeter>,
) -> Self {
Expand All @@ -88,11 +88,11 @@ where
}

pub(crate) async fn acquire_owned(&self) -> OwnedMeteredSemPermit<SK> {
if let Some(max) = self.max_permits {
if let Some(ref max) = self.max_permits {
self.extant_permits
.1
.clone()
.wait_for(|&ep| ep < max)
.wait_for(|&ep| ep < max.load(Ordering::Relaxed))
.await
.expect("Extant permit channel is never closed");
}
Expand All @@ -101,8 +101,8 @@ where
}

pub(crate) fn try_acquire_owned(&self) -> Result<OwnedMeteredSemPermit<SK>, ()> {
if let Some(max) = self.max_permits
&& *self.extant_permits.1.borrow() >= max
if let Some(ref max) = self.max_permits
&& *self.extant_permits.1.borrow() >= max.load(Ordering::Relaxed)
{
return Err(());
}
Expand Down Expand Up @@ -482,7 +482,7 @@ pub(crate) mod tests {
#[tokio::test]
async fn respects_max_extant_permits() {
let mut sem = fixed_size_permit_dealer::<WorkflowSlotKind>(2);
sem.max_permits = Some(1);
sem.max_permits = Some(Arc::new(AtomicUsize::new(1)));
let perm = sem.try_acquire_owned().unwrap();
sem.try_acquire_owned().unwrap_err();
let acquire_fut = sem.acquire_owned();
Expand Down
1 change: 1 addition & 0 deletions core/src/core_tests/activity_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,7 @@ async fn pass_activity_summary_to_metadata() {
..Default::default()
})
.await;

Ok(().into())
});
worker
Expand Down
5 changes: 3 additions & 2 deletions core/src/core_tests/workflow_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3254,11 +3254,12 @@ async fn both_normal_and_sticky_pollers_poll_concurrently() {
.nonsticky_to_sticky_poll_ratio(0.2)
.no_remote_activities(true)
.build()
.unwrap(),
.unwrap()
.into(),
Some("stickytq".to_string()),
Arc::new(mock_client),
None,
None,
false,
);

for _ in 1..50 {
Expand Down
Loading
Loading