From ace8e293c6676d462e4c78343e052d93b35aee81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Sun, 6 Jul 2025 11:12:58 +0200 Subject: [PATCH 1/5] Optimize `read2` streaming Before, the whole stdout/stderr buffer would be copied unnecessarily after each data chunk was received. --- collector/src/lib.rs | 19 ++++++++++--------- collector/src/utils/read2.rs | 9 +++++---- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/collector/src/lib.rs b/collector/src/lib.rs index 91210dce6..ab47632a4 100644 --- a/collector/src/lib.rs +++ b/collector/src/lib.rs @@ -177,11 +177,12 @@ fn run_command_with_output(cmd: &mut Command) -> anyhow::Result .spawn() .with_context(|| format!("failed to spawn process for cmd: {:?}", cmd))?; - let mut stdout = Vec::new(); - let mut stderr = Vec::new(); - let mut stdout_writer = std::io::LineWriter::new(std::io::stdout()); - let mut stderr_writer = std::io::LineWriter::new(std::io::stderr()); - read2::read2( + let mut stdout_writer = std::io::LineWriter::new(std::io::stdout().lock()); + let mut stderr_writer = std::io::LineWriter::new(std::io::stderr().lock()); + + let mut stdout_written = 0; + let mut stderr_written = 0; + let (stdout, stderr) = read2::read2( child.stdout.take().unwrap(), child.stderr.take().unwrap(), &mut |is_stdout, buffer, _is_done| { @@ -189,15 +190,15 @@ fn run_command_with_output(cmd: &mut Command) -> anyhow::Result if log::log_enabled!(target: "raw_cargo_messages", log::Level::Trace) { use std::io::Write; if is_stdout { - stdout_writer.write_all(&buffer[stdout.len()..]).unwrap(); + stdout_writer.write_all(&buffer[stdout_written..]).unwrap(); } else { - stderr_writer.write_all(&buffer[stderr.len()..]).unwrap(); + stderr_writer.write_all(&buffer[stderr_written..]).unwrap(); } } if is_stdout { - stdout = buffer.clone(); + stdout_written = buffer.len(); } else { - stderr = buffer.clone(); + stderr_written = buffer.len(); } }, )?; diff --git a/collector/src/utils/read2.rs b/collector/src/utils/read2.rs index 116645de0..987bccff2 100644 --- a/collector/src/utils/read2.rs +++ b/collector/src/utils/read2.rs @@ -31,11 +31,12 @@ mod imp { use std::os::unix::prelude::*; use std::process::{ChildStderr, ChildStdout}; + /// Returns (stdout, stderr). pub fn read2( mut out_pipe: ChildStdout, mut err_pipe: ChildStderr, data: &mut dyn FnMut(bool, &mut Vec, bool), - ) -> io::Result<()> { + ) -> io::Result<(Vec, Vec)> { unsafe { libc::fcntl(out_pipe.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK); libc::fcntl(err_pipe.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK); @@ -93,7 +94,7 @@ mod imp { } data(true, &mut out, out_done); } - Ok(()) + Ok((out, err)) } } @@ -120,7 +121,7 @@ mod imp { out_pipe: ChildStdout, err_pipe: ChildStderr, data: &mut dyn FnMut(bool, &mut Vec, bool), - ) -> io::Result<()> { + ) -> io::Result<(Vec, Vec)> { let mut out = Vec::new(); let mut err = Vec::new(); @@ -151,7 +152,7 @@ mod imp { } } - Ok(()) + Ok((out, err)) } } From d7d577d17c9d3f9185e19ec23b13e15220e8ab59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Sun, 6 Jul 2025 11:16:03 +0200 Subject: [PATCH 2/5] Remove manual usage of `run_command_with_output` --- collector/src/runtime/mod.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/collector/src/runtime/mod.rs b/collector/src/runtime/mod.rs index a18d04afe..791c4bf72 100644 --- a/collector/src/runtime/mod.rs +++ b/collector/src/runtime/mod.rs @@ -15,7 +15,7 @@ pub use benchmark::{ use database::{ArtifactIdNumber, CollectionId, Connection}; use crate::utils::git::get_rustc_perf_commit; -use crate::{run_command_with_output, CollectorCtx}; +use crate::{command_output, CollectorCtx}; mod benchmark; mod profile; @@ -218,15 +218,7 @@ fn execute_runtime_benchmark_binary( command.args(["--include", &filter.include.join(",")]); } - let output = run_command_with_output(&mut command)?; - if !output.status.success() { - return Err(anyhow::anyhow!( - "Process finished with exit code {}\n{}", - output.status.code().unwrap_or(-1), - String::from_utf8_lossy(&output.stderr) - )); - } - + let output = command_output(&mut command)?; let reader = BufReader::new(Cursor::new(output.stdout)); Ok(reader.lines().map(|line| { Ok(line.and_then(|line| Ok(serde_json::from_str::(&line)?))?) From 6a28881006662fc6168698f5a8df1f5365459312 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Sun, 6 Jul 2025 11:18:03 +0200 Subject: [PATCH 3/5] Stream output of the rustc benchmark --- collector/src/compile/execute/rustc.rs | 2 +- collector/src/lib.rs | 29 +++++++++++++++++++------- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/collector/src/compile/execute/rustc.rs b/collector/src/compile/execute/rustc.rs index c2cc3756f..57ca0bf83 100644 --- a/collector/src/compile/execute/rustc.rs +++ b/collector/src/compile/execute/rustc.rs @@ -105,7 +105,7 @@ async fn record( .context("configuring")?; assert!(status.success(), "configure successful"); - let output = crate::command_output( + let output = crate::command_output_stream( Command::new("python3") .arg( checkout diff --git a/collector/src/lib.rs b/collector/src/lib.rs index ab47632a4..94b9244c6 100644 --- a/collector/src/lib.rs +++ b/collector/src/lib.rs @@ -168,7 +168,12 @@ pub fn run_command(cmd: &mut Command) -> anyhow::Result<()> { Ok(()) } -fn run_command_with_output(cmd: &mut Command) -> anyhow::Result { +/// If `stream_output` is true, stdout/stderr of `cmd` should be streamed to stdout/stderr of the +/// current process, in addition to being captured. +fn run_command_with_output( + cmd: &mut Command, + stream_output: bool, +) -> anyhow::Result { use anyhow::Context; use utils::read2; let mut child = cmd @@ -187,7 +192,7 @@ fn run_command_with_output(cmd: &mut Command) -> anyhow::Result child.stderr.take().unwrap(), &mut |is_stdout, buffer, _is_done| { // Send output if trace logging is enabled - if log::log_enabled!(target: "raw_cargo_messages", log::Level::Trace) { + if stream_output || log::log_enabled!(target: "raw_cargo_messages", log::Level::Trace) { use std::io::Write; if is_stdout { stdout_writer.write_all(&buffer[stdout_written..]).unwrap(); @@ -215,18 +220,28 @@ fn run_command_with_output(cmd: &mut Command) -> anyhow::Result } pub fn command_output(cmd: &mut Command) -> anyhow::Result { - let output = run_command_with_output(cmd)?; + let output = run_command_with_output(cmd, false)?; + check_command_output(&output)?; + Ok(output) +} + +pub fn command_output_stream(cmd: &mut Command) -> anyhow::Result { + let output = run_command_with_output(cmd, true)?; + check_command_output(&output)?; + Ok(output) +} +fn check_command_output(output: &process::Output) -> anyhow::Result<()> { if !output.status.success() { - return Err(anyhow::anyhow!( + Err(anyhow::anyhow!( "expected success, got {}\n\nstderr={}\n\n stdout={}\n", output.status, String::from_utf8_lossy(&output.stderr), String::from_utf8_lossy(&output.stdout) - )); + )) + } else { + Ok(()) } - - Ok(output) } pub async fn async_command_output( From 9a9d5fcc293bff033b91013c1fcbfba1e3696969 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Sun, 6 Jul 2025 11:30:44 +0200 Subject: [PATCH 4/5] Fix Windows compilation --- collector/src/utils/fs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/src/utils/fs.rs b/collector/src/utils/fs.rs index e38176df4..a32099a94 100644 --- a/collector/src/utils/fs.rs +++ b/collector/src/utils/fs.rs @@ -130,7 +130,7 @@ pub fn robocopy( cmd.arg(arg.as_ref()); } - let output = run_command_with_output(&mut cmd)?; + let output = run_command_with_output(&mut cmd, false)?; if output.status.code() >= Some(8) { // robocopy returns 0-7 on success From 06cbf71e6e37776dc1c0e545f9e7ee972b50a902 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Sun, 6 Jul 2025 12:00:34 +0200 Subject: [PATCH 5/5] Remove duplicated command log Before, we printed the command twice with trace logging, which was spammy in logs. --- collector/src/compile/execute/mod.rs | 2 -- collector/src/lib.rs | 13 ++++--------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/collector/src/compile/execute/mod.rs b/collector/src/compile/execute/mod.rs index 0bf5b210c..99e37f473 100644 --- a/collector/src/compile/execute/mod.rs +++ b/collector/src/compile/execute/mod.rs @@ -446,8 +446,6 @@ impl<'a> CargoProcess<'a> { client.configure(&mut cmd); } - log::debug!("{:?}", cmd); - let cmd = tokio::process::Command::from(cmd); let output = async_command_output(cmd).await?; diff --git a/collector/src/lib.rs b/collector/src/lib.rs index 94b9244c6..d54d38bb7 100644 --- a/collector/src/lib.rs +++ b/collector/src/lib.rs @@ -249,6 +249,8 @@ pub async fn async_command_output( ) -> anyhow::Result { use anyhow::Context; + log::debug!("Executing {:?}", cmd); + let start = Instant::now(); let child = cmd .stdout(Stdio::piped()) @@ -256,16 +258,9 @@ pub async fn async_command_output( .spawn() .with_context(|| format!("failed to spawn process for cmd: {:?}", cmd))?; let output = child.wait_with_output().await?; - log::trace!("command {cmd:?} took {} ms", start.elapsed().as_millis()); + log::trace!("Command took {} ms", start.elapsed().as_millis()); - if !output.status.success() { - return Err(anyhow::anyhow!( - "expected success, got {}\n\nstderr={}\n\n stdout={}\n", - output.status, - String::from_utf8_lossy(&output.stderr), - String::from_utf8_lossy(&output.stdout) - )); - } + check_command_output(&output)?; Ok(output) }