Skip to content

Commit 1d4bafb

Browse files
committed
Some quick benching
1 parent 704848a commit 1d4bafb

File tree

4 files changed

+132
-21
lines changed

4 files changed

+132
-21
lines changed

core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ path = "../fsm"
9191
assert_matches = "1.4"
9292
bimap = "0.6.1"
9393
clap = { version = "4.0", features = ["derive"] }
94-
criterion = "0.5"
94+
criterion = { version = "0.6", features = ["async", "async_tokio"] }
9595
rstest = "0.25"
9696
temporal-sdk-core-test-utils = { path = "../test-utils" }
9797
temporal-sdk = { path = "../sdk" }

core/benches/workflow_replay.rs

Lines changed: 114 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,27 @@
1-
use criterion::{Criterion, criterion_group, criterion_main};
1+
use criterion::{BatchSize, Criterion, criterion_group, criterion_main};
22
use futures_util::StreamExt;
3-
use std::time::Duration;
3+
use std::{
4+
sync::{Arc, mpsc},
5+
thread,
6+
time::Duration,
7+
};
48
use temporal_sdk::{WfContext, WorkflowFunction};
5-
use temporal_sdk_core::replay::HistoryForReplay;
9+
use temporal_sdk_core::{CoreRuntime, replay::HistoryForReplay};
10+
use temporal_sdk_core_api::telemetry::metrics::{
11+
MetricKeyValue, MetricParametersBuilder, NewAttributes,
12+
};
613
use temporal_sdk_core_protos::DEFAULT_WORKFLOW_TYPE;
7-
use temporal_sdk_core_test_utils::{canned_histories, replay_sdk_worker};
14+
use temporal_sdk_core_test_utils::{
15+
DONT_AUTO_INIT_INTEG_TELEM, canned_histories, prom_metrics, replay_sdk_worker,
16+
};
817

918
pub fn criterion_benchmark(c: &mut Criterion) {
1019
let tokio_runtime = tokio::runtime::Builder::new_current_thread()
1120
.enable_time()
1221
.build()
1322
.unwrap();
1423
let _g = tokio_runtime.enter();
24+
DONT_AUTO_INIT_INTEG_TELEM.set(true);
1525

1626
let num_timers = 10;
1727
let t = canned_histories::long_sequential_timers(num_timers as usize);
@@ -21,14 +31,17 @@ pub fn criterion_benchmark(c: &mut Criterion) {
2131
);
2232

2333
c.bench_function("Small history replay", |b| {
24-
b.iter(|| {
25-
tokio_runtime.block_on(async {
34+
b.to_async(&tokio_runtime).iter_batched(
35+
|| {
2636
let func = timers_wf(num_timers);
27-
let mut worker = replay_sdk_worker([hist.clone()]);
37+
(func, replay_sdk_worker([hist.clone()]))
38+
},
39+
|(func, mut worker)| async move {
2840
worker.register_wf(DEFAULT_WORKFLOW_TYPE, func);
2941
worker.run().await.unwrap();
30-
})
31-
})
42+
},
43+
BatchSize::SmallInput,
44+
)
3245
});
3346

3447
let num_tasks = 50;
@@ -39,18 +52,104 @@ pub fn criterion_benchmark(c: &mut Criterion) {
3952
);
4053

4154
c.bench_function("Large payloads history replay", |b| {
42-
b.iter(|| {
43-
tokio_runtime.block_on(async {
55+
b.to_async(&tokio_runtime).iter_batched(
56+
|| {
4457
let func = big_signals_wf(num_tasks);
45-
let mut worker = replay_sdk_worker([hist.clone()]);
58+
(func, replay_sdk_worker([hist.clone()]))
59+
},
60+
|(func, mut worker)| async move {
4661
worker.register_wf(DEFAULT_WORKFLOW_TYPE, func);
4762
worker.run().await.unwrap();
48-
})
49-
})
63+
},
64+
BatchSize::SmallInput,
65+
)
5066
});
5167
}
5268

53-
criterion_group!(benches, criterion_benchmark);
69+
pub fn bench_metrics(c: &mut Criterion) {
70+
DONT_AUTO_INIT_INTEG_TELEM.set(true);
71+
let tokio_runtime = tokio::runtime::Builder::new_current_thread()
72+
.enable_all()
73+
.build()
74+
.unwrap();
75+
let _tokio = tokio_runtime.enter();
76+
let (mut telemopts, addr, _aborter) = prom_metrics(None);
77+
telemopts.logging = None;
78+
let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap();
79+
let meter = rt.telemetry().get_metric_meter().unwrap();
80+
81+
c.bench_function("Record with new attributes on each call", move |b| {
82+
b.iter_batched(
83+
|| {
84+
let c = meter.counter(
85+
MetricParametersBuilder::default()
86+
.name("c")
87+
.build()
88+
.unwrap(),
89+
);
90+
let h = meter.histogram(
91+
MetricParametersBuilder::default()
92+
.name("h")
93+
.build()
94+
.unwrap(),
95+
);
96+
let g = meter.gauge(
97+
MetricParametersBuilder::default()
98+
.name("g")
99+
.build()
100+
.unwrap(),
101+
);
102+
103+
let vals = [1, 2, 3, 4, 5];
104+
let labels = ["l1", "l2"];
105+
106+
let (start_tx, start_rx) = mpsc::channel();
107+
let start_rx = Arc::new(std::sync::Mutex::new(start_rx));
108+
109+
let mut thread_handles = Vec::new();
110+
for _ in 0..3 {
111+
let c = c.clone();
112+
let h = h.clone();
113+
let g = g.clone();
114+
let meter = meter.clone();
115+
let start_rx = start_rx.clone();
116+
117+
let handle = thread::spawn(move || {
118+
// Wait for start signal
119+
let _ = start_rx.lock().unwrap().recv();
120+
121+
for _ in 1..=100 {
122+
for &val in &vals {
123+
for &label in &labels {
124+
let attribs = meter.new_attributes(NewAttributes::from(vec![
125+
MetricKeyValue::new("label", label),
126+
]));
127+
c.add(val, &attribs);
128+
h.record(val, &attribs);
129+
g.record(val, &attribs);
130+
}
131+
}
132+
}
133+
});
134+
thread_handles.push(handle);
135+
}
136+
137+
(start_tx, thread_handles)
138+
},
139+
|(start_tx, thread_handles)| {
140+
for _ in 0..3 {
141+
let _ = start_tx.send(());
142+
}
143+
for handle in thread_handles {
144+
let _ = handle.join();
145+
}
146+
},
147+
BatchSize::SmallInput,
148+
)
149+
});
150+
}
151+
152+
criterion_group!(benches, criterion_benchmark, bench_metrics);
54153
criterion_main!(benches);
55154

56155
fn timers_wf(num_timers: u32) -> WorkflowFunction {

test-utils/src/lib.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use parking_lot::Mutex;
1818
use prost::Message;
1919
use rand::Rng;
2020
use std::{
21+
cell::Cell,
2122
convert::TryFrom,
2223
env,
2324
future::Future,
@@ -171,8 +172,15 @@ pub async fn history_from_proto_binary(path_from_root: &str) -> Result<History,
171172
Ok(History::decode(&*bytes)?)
172173
}
173174

175+
thread_local! {
176+
/// Can be set true to disable auto-initialization of integ-test telemetry.
177+
pub static DONT_AUTO_INIT_INTEG_TELEM: Cell<bool> = const { Cell::new(false) };
178+
}
174179
static INTEG_TESTS_RT: std::sync::OnceLock<CoreRuntime> = std::sync::OnceLock::new();
175-
pub fn init_integ_telem() -> &'static CoreRuntime {
180+
pub fn init_integ_telem() {
181+
if DONT_AUTO_INIT_INTEG_TELEM.get() {
182+
return;
183+
}
176184
INTEG_TESTS_RT.get_or_init(|| {
177185
let telemetry_options = get_integ_telem_options();
178186
let rt =
@@ -181,7 +189,7 @@ pub fn init_integ_telem() -> &'static CoreRuntime {
181189
let _ = tracing::subscriber::set_global_default(sub);
182190
}
183191
rt
184-
})
192+
});
185193
}
186194

187195
/// Implements a builder pattern to help integ tests initialize core and create workflows

tests/heavy_tests.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@ use std::{
1212
};
1313
use temporal_client::{GetWorkflowResultOpts, WfClientExt, WorkflowClientTrait, WorkflowOptions};
1414
use temporal_sdk::{ActContext, ActivityOptions, WfContext, WorkflowResult};
15-
use temporal_sdk_core::{ResourceBasedTuner, ResourceSlotOptions};
15+
use temporal_sdk_core::{CoreRuntime, ResourceBasedTuner, ResourceSlotOptions};
1616
use temporal_sdk_core_api::worker::PollerBehavior;
1717
use temporal_sdk_core_protos::{
1818
coresdk::{AsJsonPayloadExt, workflow_commands::ActivityCancellationType},
1919
temporal::api::enums::v1::WorkflowIdReusePolicy,
2020
};
21-
use temporal_sdk_core_test_utils::{CoreWfStarter, rand_6_chars, workflows::la_problem_workflow};
21+
use temporal_sdk_core_test_utils::{
22+
CoreWfStarter, prom_metrics, rand_6_chars, workflows::la_problem_workflow,
23+
};
2224

2325
mod fuzzy_workflow;
2426

@@ -183,7 +185,9 @@ async fn workflow_load() {
183185
const SIGNAME: &str = "signame";
184186
let num_workflows = 500;
185187
let wf_name = "workflow_load";
186-
let mut starter = CoreWfStarter::new("workflow_load");
188+
let (telemopts, _, _aborter) = prom_metrics(None);
189+
let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap();
190+
let mut starter = CoreWfStarter::new_with_runtime("workflow_load", rt);
187191
starter
188192
.worker_config
189193
.max_outstanding_workflow_tasks(5_usize)

0 commit comments

Comments
 (0)