From 077d57cbefdbe9b448da1cc486b41079e1728541 Mon Sep 17 00:00:00 2001 From: Brandon Kiser Date: Fri, 27 Jun 2025 16:47:49 -0700 Subject: [PATCH 1/3] feat: add request metadata to the conversation history and emitted telemetry --- crates/chat-cli/build.rs | 2 +- crates/chat-cli/src/cli/chat/cli/compact.rs | 2 +- crates/chat-cli/src/cli/chat/conversation.rs | 152 ++++--- crates/chat-cli/src/cli/chat/mod.rs | 409 +++++++++++++------ crates/chat-cli/src/cli/chat/parser.rs | 283 +++++++++++-- crates/chat-cli/src/telemetry/core.rs | 85 +++- crates/chat-cli/src/telemetry/definitions.rs | 7 + crates/chat-cli/src/telemetry/mod.rs | 47 ++- crates/chat-cli/telemetry_definitions.json | 84 +++- 9 files changed, 834 insertions(+), 237 deletions(-) diff --git a/crates/chat-cli/build.rs b/crates/chat-cli/build.rs index f097298af8..fe39e2dbc3 100644 --- a/crates/chat-cli/build.rs +++ b/crates/chat-cli/build.rs @@ -122,7 +122,7 @@ fn main() { quote::quote!( #[doc = #description] - #[derive(Debug, Clone, PartialEq)] + #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] #[non_exhaustive] pub enum #name { #( diff --git a/crates/chat-cli/src/cli/chat/cli/compact.rs b/crates/chat-cli/src/cli/chat/cli/compact.rs index f132ef27e8..18b2bea2a7 100644 --- a/crates/chat-cli/src/cli/chat/cli/compact.rs +++ b/crates/chat-cli/src/cli/chat/cli/compact.rs @@ -49,7 +49,7 @@ pub struct CompactArgs { } impl CompactArgs { - pub async fn execute(self, os: &Os, session: &mut ChatSession) -> Result { + pub async fn execute(self, os: &mut Os, session: &mut ChatSession) -> Result { let default = CompactStrategy::default(); let prompt = if self.prompt.is_empty() { None diff --git a/crates/chat-cli/src/cli/chat/conversation.rs b/crates/chat-cli/src/cli/chat/conversation.rs index 1d5e040b26..63f0940903 100644 --- a/crates/chat-cli/src/cli/chat/conversation.rs +++ b/crates/chat-cli/src/cli/chat/conversation.rs @@ -32,6 +32,7 @@ use super::message::{ ToolUseResult, UserMessage, }; +use super::parser::RequestMetadata; use super::token_counter::{ CharCount, CharCounter, @@ -65,6 +66,14 @@ use crate::os::Os; const CONTEXT_ENTRY_START_HEADER: &str = "--- CONTEXT ENTRY BEGIN ---\n"; const CONTEXT_ENTRY_END_HEADER: &str = "--- CONTEXT ENTRY END ---\n\n"; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HistoryEntry { + user: UserMessage, + assistant: AssistantMessage, + #[serde(default)] + request_metadata: Option, +} + /// Tracks state related to an ongoing conversation. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ConversationState { @@ -73,7 +82,7 @@ pub struct ConversationState { /// The next user message to be sent as part of the conversation. Required to be [Some] before /// calling [Self::as_sendable_conversation_state]. next_message: Option, - history: VecDeque<(UserMessage, AssistantMessage)>, + history: VecDeque, /// The range in the history sendable to the backend (start inclusive, end exclusive). valid_history_range: (usize, usize), /// Similar to history in that stores user and assistant responses, except that it is not used @@ -90,7 +99,7 @@ pub struct ConversationState { /// Cached value representing the length of the user context message. context_message_length: Option, /// Stores the latest conversation summary created by /compact - latest_summary: Option, + latest_summary: Option<(String, RequestMetadata)>, #[serde(skip)] pub agents: Agents, /// Model explicitly selected by the user in this conversation state via `/model`. @@ -141,10 +150,10 @@ impl ConversationState { } pub fn latest_summary(&self) -> Option<&str> { - self.latest_summary.as_deref() + self.latest_summary.as_ref().map(|(s, _)| s.as_str()) } - pub fn history(&self) -> &VecDeque<(UserMessage, AssistantMessage)> { + pub fn history(&self) -> &VecDeque { &self.history } @@ -177,10 +186,14 @@ impl ConversationState { }, } if candidate_asst.is_some() && candidate_user.is_some() { - let asst = candidate_asst.take().unwrap(); + let assistant = candidate_asst.take().unwrap(); let user = candidate_user.take().unwrap(); - self.append_assistant_transcript(&asst); - self.history.push_back((user, asst)); + self.append_assistant_transcript(&assistant); + self.history.push_back(HistoryEntry { + user, + assistant, + request_metadata: None, + }); } } Some(last_msg.content.to_string()) @@ -212,12 +225,21 @@ impl ConversationState { } /// Sets the response message according to the currently set [Self::next_message]. - pub fn push_assistant_message(&mut self, os: &mut Os, message: AssistantMessage) { + pub fn push_assistant_message( + &mut self, + os: &mut Os, + message: AssistantMessage, + request_metadata: Option, + ) { debug_assert!(self.next_message.is_some(), "next_message should exist"); let next_user_message = self.next_message.take().expect("next user message should exist"); self.append_assistant_transcript(&message); - self.history.push_back((next_user_message, message)); + self.history.push_back(HistoryEntry { + user: next_user_message, + assistant: message, + request_metadata, + }); if let Ok(cwd) = std::env::current_dir() { os.database.set_conversation_by_path(cwd, self).ok(); @@ -233,7 +255,23 @@ impl ConversationState { /// /// This is equivalent to `utterance_id` in the Q API. pub fn message_id(&self) -> Option<&str> { - self.history.back().and_then(|(_, msg)| msg.message_id()) + self.history + .back() + .and_then(|HistoryEntry { assistant, .. }| assistant.message_id()) + } + + pub fn latest_tool_use_ids(&self) -> Option { + self.history + .back() + .and_then(|HistoryEntry { assistant, .. }| assistant.tool_uses()) + .map(|tools| (tools.iter().map(|t| t.id.as_str()).collect::>().join(","))) + } + + pub fn latest_tool_use_names(&self) -> Option { + self.history + .back() + .and_then(|HistoryEntry { assistant, .. }| assistant.tool_uses()) + .map(|tools| (tools.iter().map(|t| t.name.as_str()).collect::>().join(","))) } /// Updates the history so that, when non-empty, the following invariants are in place: @@ -440,7 +478,7 @@ impl ConversationState { FILTER OUT CHAT CONVENTIONS (greetings, offers to help, etc).".to_string() }, }; - if let Some(summary) = &self.latest_summary { + if let Some((summary, _)) = &self.latest_summary { summary_content.push_str("\n\n"); summary_content.push_str(CONTEXT_ENTRY_START_HEADER); summary_content.push_str("This summary contains ALL relevant information from our previous conversation including tool uses, results, code analysis, and file operations. YOU MUST be sure to include this information when creating your summarization document.\n\n"); @@ -457,8 +495,8 @@ impl ConversationState { let mut history = conv_state.history.cloned().collect::>(); history.drain((history.len().saturating_sub(strategy.messages_to_exclude))..); if strategy.truncate_large_messages { - for (user_message, _) in &mut history { - user_message.truncate_safe(strategy.max_message_length); + for HistoryEntry { user, .. } in &mut history { + user.truncate_safe(strategy.max_message_length); } } @@ -488,10 +526,15 @@ impl ConversationState { /// `strategy` - The [CompactStrategy] used for the corresponding /// [ConversationState::create_summary_request]. - pub fn replace_history_with_summary(&mut self, summary: String, strategy: CompactStrategy) { + pub fn replace_history_with_summary( + &mut self, + summary: String, + strategy: CompactStrategy, + request_metadata: RequestMetadata, + ) { self.history .drain(..(self.history.len().saturating_sub(strategy.messages_to_exclude))); - self.latest_summary = Some(summary); + self.latest_summary = Some((summary, request_metadata)); } pub fn current_profile(&self) -> Option<&str> { @@ -514,10 +557,10 @@ impl ConversationState { &mut self, os: &Os, conversation_start_context: Option, - ) -> (Option>, Vec<(String, String)>) { + ) -> (Option>, Vec<(String, String)>) { let mut context_content = String::new(); let mut dropped_context_files = Vec::new(); - if let Some(summary) = &self.latest_summary { + if let Some((summary, _)) = &self.latest_summary { context_content.push_str(CONTEXT_ENTRY_START_HEADER); context_content.push_str("This summary contains ALL relevant information from our previous conversation including tool uses, results, code analysis, and file operations. YOU MUST reference this information when answering questions and explicitly acknowledge specific details from the summary when they're relevant to the current question.\n\n"); context_content.push_str("SUMMARY CONTENT:\n"); @@ -554,9 +597,16 @@ impl ConversationState { if !context_content.is_empty() { self.context_message_length = Some(context_content.len()); - let user_msg = UserMessage::new_prompt(context_content); - let assistant_msg = AssistantMessage::new_response(None, "I will fully incorporate this information when generating my responses, and explicitly acknowledge relevant parts of the summary when answering questions.".into()); - (Some(vec![(user_msg, assistant_msg)]), dropped_context_files) + let user = UserMessage::new_prompt(context_content); + let assistant = AssistantMessage::new_response(None, "I will fully incorporate this information when generating my responses, and explicitly acknowledge relevant parts of the summary when answering questions.".into()); + ( + Some(vec![HistoryEntry { + user, + assistant, + request_metadata: None, + }]), + dropped_context_files, + ) } else { (None, dropped_context_files) } @@ -611,11 +661,8 @@ impl ConversationState { /// /// This is intended to provide us ways to accurately assess the exact state that is sent to the /// model without having to needlessly clone and mutate [ConversationState] in strange ways. -pub type BackendConversationState<'a> = BackendConversationStateImpl< - 'a, - std::collections::vec_deque::Iter<'a, (UserMessage, AssistantMessage)>, - Option>, ->; +pub type BackendConversationState<'a> = + BackendConversationStateImpl<'a, std::collections::vec_deque::Iter<'a, HistoryEntry>, Option>>; /// See [BackendConversationState] #[derive(Debug, Clone)] @@ -629,13 +676,7 @@ pub struct BackendConversationStateImpl<'a, T, U> { pub model_id: Option<&'a str>, } -impl - BackendConversationStateImpl< - '_, - std::collections::vec_deque::Iter<'_, (UserMessage, AssistantMessage)>, - Option>, - > -{ +impl BackendConversationStateImpl<'_, std::collections::vec_deque::Iter<'_, HistoryEntry>, Option>> { fn into_fig_conversation_state(self) -> eyre::Result { let history = flatten_history(self.context_messages.unwrap_or_default().iter().chain(self.history)); let user_input_message: UserInputMessage = self @@ -659,7 +700,7 @@ impl // Count the chars used by the messages in the history. // this clone is cheap let history = self.history.clone(); - for (user, assistant) in history { + for HistoryEntry { user, assistant, .. } in history { user_chars += *user.char_count(); assistant_chars += *assistant.char_count(); } @@ -669,7 +710,7 @@ impl .context_messages .as_ref() .map(|v| { - v.iter().fold(0, |acc, (user, assistant)| { + v.iter().fold(0, |acc, HistoryEntry { user, assistant, .. }| { acc + *user.char_count() + *assistant.char_count() }) }) @@ -694,9 +735,9 @@ pub struct ConversationSize { /// Converts a list of user/assistant message pairs into a flattened list of ChatMessage. fn flatten_history<'a, T>(history: T) -> Vec where - T: Iterator, + T: Iterator, { - history.fold(Vec::new(), |mut acc, (user, assistant)| { + history.fold(Vec::new(), |mut acc, HistoryEntry { user, assistant, .. }| { acc.push(ChatMessage::UserInputMessage(user.clone().into_history_entry())); acc.push(ChatMessage::AssistantResponseMessage(assistant.clone().into())); acc @@ -738,7 +779,7 @@ fn format_hook_context<'a>(hook_results: impl IntoIterator, + history: &mut VecDeque, next_message: &mut Option, tools: &HashMap>, ) -> (usize, usize) { @@ -755,7 +796,7 @@ fn enforce_conversation_invariants( .iter() .enumerate() .skip(1) - .find(|(_, (m, _))| -> bool { !m.has_tool_use_results() }) + .find(|(_, HistoryEntry { user, .. })| -> bool { !user.has_tool_use_results() }) .map(|v| v.0) { Some(i) => { @@ -779,7 +820,7 @@ fn enforce_conversation_invariants( // If the first message contains tool results, then we add the results to the content field // instead. This is required to avoid validation errors. - if let Some((user, _)) = history.front_mut() { + if let Some(HistoryEntry { user, .. }) = history.front_mut() { if user.has_tool_use_results() { user.replace_content_with_tool_use_results(); } @@ -792,7 +833,12 @@ fn enforce_conversation_invariants( history.range(valid_history_range.0..valid_history_range.1).last(), ) { (Some(next_message), prev_msg) if next_message.has_tool_use_results() => match prev_msg { - None | Some((_, AssistantMessage::Response { .. })) => { + // None | Some((_, AssistantMessage::Response { .. }, _)) => { + None + | Some(HistoryEntry { + assistant: AssistantMessage::Response { .. }, + .. + }) => { next_message.replace_content_with_tool_use_results(); }, _ => (), @@ -802,7 +848,14 @@ fn enforce_conversation_invariants( // If the last message from the assistant contains tool uses AND next_message is set, we need to // ensure that next_message contains tool results. - if let (Some((_, AssistantMessage::ToolUse { tool_uses, .. })), Some(user_msg)) = ( + // if let (Some((_, AssistantMessage::ToolUse { tool_uses, .. }, _)), Some(user_msg)) = ( + if let ( + Some(HistoryEntry { + assistant: AssistantMessage::ToolUse { tool_uses, .. }, + .. + }), + Some(user_msg), + ) = ( history.range(valid_history_range.0..valid_history_range.1).last(), next_message, ) { @@ -822,10 +875,7 @@ fn enforce_conversation_invariants( valid_history_range } -fn enforce_tool_use_history_invariants( - history: &mut VecDeque<(UserMessage, AssistantMessage)>, - tools: &HashMap>, -) { +fn enforce_tool_use_history_invariants(history: &mut VecDeque, tools: &HashMap>) { let tool_names: HashSet<_> = tools .values() .flat_map(|tools| { @@ -836,7 +886,7 @@ fn enforce_tool_use_history_invariants( .filter(|name| *name != DUMMY_TOOL_NAME) .collect(); - for (_, assistant) in history { + for HistoryEntry { assistant, .. } in history { if let AssistantMessage::ToolUse { tool_uses, .. } = assistant { for tool_use in tool_uses { if tool_names.contains(tool_use.name.as_str()) { @@ -1019,7 +1069,7 @@ mod tests { .await .unwrap(); assert_conversation_state_invariants(s, i); - conversation.push_assistant_message(&mut os, AssistantMessage::new_response(None, i.to_string())); + conversation.push_assistant_message(&mut os, AssistantMessage::new_response(None, i.to_string()), None); conversation.set_next_user_message(i.to_string()).await; } } @@ -1056,6 +1106,7 @@ mod tests { args: serde_json::Value::Null, ..Default::default() }]), + None, ); conversation.add_tool_results(vec![ToolUseResult { tool_use_id: "tool_id".to_string(), @@ -1083,6 +1134,7 @@ mod tests { args: serde_json::Value::Null, ..Default::default() }]), + None, ); conversation.add_tool_results(vec![ToolUseResult { tool_use_id: "tool_id".to_string(), @@ -1090,7 +1142,7 @@ mod tests { status: ToolResultStatus::Success, }]); } else { - conversation.push_assistant_message(&mut os, AssistantMessage::new_response(None, i.to_string())); + conversation.push_assistant_message(&mut os, AssistantMessage::new_response(None, i.to_string()), None); conversation.set_next_user_message(i.to_string()).await; } } @@ -1146,7 +1198,7 @@ mod tests { assert_conversation_state_invariants(s, i); - conversation.push_assistant_message(&mut os, AssistantMessage::new_response(None, i.to_string())); + conversation.push_assistant_message(&mut os, AssistantMessage::new_response(None, i.to_string()), None); conversation.set_next_user_message(i.to_string()).await; } } @@ -1219,7 +1271,7 @@ mod tests { s.user_input_message.content ); - conversation.push_assistant_message(&mut os, AssistantMessage::new_response(None, i.to_string())); + conversation.push_assistant_message(&mut os, AssistantMessage::new_response(None, i.to_string()), None); conversation.set_next_user_message(i.to_string()).await; } } diff --git a/crates/chat-cli/src/cli/chat/mod.rs b/crates/chat-cli/src/cli/chat/mod.rs index 3b06d47ead..86d7ce1597 100644 --- a/crates/chat-cli/src/cli/chat/mod.rs +++ b/crates/chat-cli/src/cli/chat/mod.rs @@ -29,6 +29,7 @@ use std::io::{ Write, }; use std::process::ExitCode; +use std::sync::Arc; use std::time::Duration; use amzn_codewhisperer_client::types::SubscriptionStatus; @@ -71,7 +72,8 @@ use parse::{ }; use parser::{ RecvErrorKind, - ResponseParser, + RequestMetadata, + SendMessageStream, }; use regex::Regex; use spinners::{ @@ -82,6 +84,10 @@ use thiserror::Error; use time::OffsetDateTime; use token_counter::TokenCounter; use tokio::signal::ctrl_c; +use tokio::sync::{ + Mutex, + broadcast, +}; use tool_manager::{ ToolManager, ToolManagerBuilder, @@ -110,9 +116,11 @@ use winnow::Partial; use winnow::stream::Offset; use super::agent::PermissionEvalResult; -use crate::api_client::ApiClientError; use crate::api_client::model::ToolResultStatus; -use crate::api_client::send_message_output::SendMessageOutput; +use crate::api_client::{ + self, + ApiClientError, +}; use crate::auth::AuthError; use crate::auth::builder_id::is_idc_user; use crate::cli::agent::Agents; @@ -128,7 +136,10 @@ use crate::cli::chat::cli::prompts::{ use crate::database::settings::Setting; use crate::mcp_client::Prompt; use crate::os::Os; -use crate::telemetry::core::ToolUseEventBuilder; +use crate::telemetry::core::{ + ChatAddedMessage, + ToolUseEventBuilder, +}; use crate::telemetry::{ ReasonCode, TelemetryResult, @@ -474,8 +485,10 @@ pub struct ChatSession { /// [ConversationState]. conversation: ConversationState, tool_uses: Vec, + /// [RequestMetadata] about the ongoing operation. + current_request_metadata: Option, pending_tool_index: Option, - /// Telemetry events to be sent as part of the conversation. + /// Telemetry events to be sent as part of the conversation. The HashMap key is tool_use_id. tool_use_telemetry_events: HashMap, /// State used to keep track of tool use relation tool_use_status: ToolUseStatus, @@ -485,6 +498,7 @@ pub struct ChatSession { pending_prompts: VecDeque, interactive: bool, inner: Option, + ctrlc_rx: broadcast::Receiver<()>, } impl ChatSession { @@ -568,6 +582,23 @@ impl ChatSession { }, }; + // Spawn a task for listening and broadcasting sigints. + let (ctrlc_tx, ctrlc_rx) = tokio::sync::broadcast::channel(4); + tokio::spawn(async move { + loop { + match ctrl_c().await { + Ok(_) => { + let _ = ctrlc_tx + .send(()) + .map_err(|err| error!(?err, "failed to send ctrlc to broadcast channel")); + }, + Err(err) => { + error!(?err, "Encountered an error while receiving a ctrl+c"); + }, + } + } + }); + Ok(Self { stdout, stderr, @@ -578,6 +609,7 @@ impl ChatSession { spinner: None, conversation, tool_uses: vec![], + current_request_metadata: None, pending_tool_index: None, tool_use_telemetry_events: HashMap::new(), tool_use_status: ToolUseStatus::Idle, @@ -585,6 +617,7 @@ impl ChatSession { pending_prompts: VecDeque::new(), interactive, inner: Some(ChatState::default()), + ctrlc_rx, }) } @@ -592,7 +625,7 @@ impl ChatSession { // Update conversation state with new tool information self.conversation.update_state(false).await; - let ctrl_c_stream = ctrl_c(); + let mut ctrl_c_stream = self.ctrlc_rx.resubscribe(); let result = match self.inner.take().expect("state must always be Some") { ChatState::PromptUser { skip_printing_tools } => { match (self.interactive, self.tool_uses.is_empty()) { @@ -611,7 +644,7 @@ impl ChatSession { ChatState::HandleInput { input } => { tokio::select! { res = self.handle_input(os, input) => res, - Ok(_) = ctrl_c_stream => Err(ChatError::Interrupted { tool_uses: Some(self.tool_uses.clone()) }) + Ok(_) = ctrl_c_stream.recv() => Err(ChatError::Interrupted { tool_uses: Some(self.tool_uses.clone()) }) } }, ChatState::CompactHistory { @@ -619,29 +652,39 @@ impl ChatSession { show_summary, strategy, } => { - tokio::select! { - res = self.compact_history(os, prompt, show_summary, strategy) => res, - Ok(_) = ctrl_c_stream => Err(ChatError::Interrupted { tool_uses: Some(self.tool_uses.clone()) }) - } + // compact_history manages ctrl+c handling + self.compact_history(os, prompt, show_summary, strategy).await }, ChatState::ExecuteTools => { let tool_uses_clone = self.tool_uses.clone(); tokio::select! { res = self.tool_use_execute(os) => res, - Ok(_) = ctrl_c_stream => Err(ChatError::Interrupted { tool_uses: Some(tool_uses_clone) }) + Ok(_) = ctrl_c_stream.recv() => Err(ChatError::Interrupted { tool_uses: Some(tool_uses_clone) }) } }, - ChatState::ValidateTools(tool_uses) => { + ChatState::ValidateTools { + tool_uses, + request_metadata, + } => { tokio::select! { - res = self.validate_tools(os, tool_uses) => res, - Ok(_) = ctrl_c_stream => Err(ChatError::Interrupted { tool_uses: None }) + res = self.validate_tools(os, tool_uses, request_metadata) => res, + Ok(_) = ctrl_c_stream.recv() => Err(ChatError::Interrupted { tool_uses: None }) } }, - ChatState::HandleResponseStream(response) => tokio::select! { - res = self.handle_response(os, response) => res, - Ok(_) = ctrl_c_stream => { - self.send_chat_telemetry(os, None, TelemetryResult::Cancelled, None, None, None).await; - Err(ChatError::Interrupted { tool_uses: None }) + ChatState::HandleResponseStream(conversation_state) => { + let request_metadata: Arc>> = Arc::new(Mutex::new(None)); + let request_metadata_clone = Arc::clone(&request_metadata); + + tokio::select! { + res = self.handle_response(os, conversation_state, request_metadata_clone) => res, + Ok(_) = ctrl_c_stream.recv() => { + debug!(?request_metadata, "ctrlc received"); + // Wait for handle_response to finish handling the ctrlc. + tokio::time::sleep(Duration::from_millis(5)).await; + let request_metadata = request_metadata.lock().await.take(); + self.send_chat_telemetry(os, TelemetryResult::Cancelled, None, None, None, request_metadata.as_ref()).await; + Err(ChatError::Interrupted { tool_uses: None }) + } } }, ChatState::Exit => return Ok(()), @@ -690,6 +733,7 @@ impl ChatSession { None, "Tool uses were interrupted, waiting for the next user prompt".to_string(), ), + None, ); }, _ => (), @@ -895,6 +939,7 @@ impl ChatSession { self.conversation.enforce_conversation_invariants(); self.conversation.reset_next_user_message(); self.pending_tool_index = None; + self.current_request_metadata = None; self.inner = Some(ChatState::PromptUser { skip_printing_tools: false, @@ -937,11 +982,16 @@ pub enum ChatState { /// Handle the user input, depending on if any tools require execution. HandleInput { input: String }, /// Validate the list of tool uses provided by the model. - ValidateTools(Vec), + ValidateTools { + tool_uses: Vec, + /// The [RequestMetadata] about the request used which requested the tool uses. + request_metadata: Option, + }, /// Execute the list of tools. ExecuteTools, /// Consume the response stream and display to the user. - HandleResponseStream(SendMessageOutput), + HandleResponseStream(crate::api_client::model::ConversationState), + // HandleResponseStream(SendMessageOutput), /// Compact the chat history. CompactHistory { /// Custom prompt to include as part of history compaction. @@ -964,6 +1014,31 @@ impl Default for ChatState { } impl ChatSession { + /// Sends a request to the SendMessage API. Emits error telemetry on failure. + async fn send_message( + &mut self, + os: &mut Os, + conversation_state: api_client::model::ConversationState, + request_metadata_lock: Arc>>, + ) -> Result { + match SendMessageStream::send_message(&os.client, conversation_state, request_metadata_lock).await { + Ok(res) => Ok(res), + Err(err) => { + let (reason, reason_desc) = get_error_reason(&err); + self.send_chat_telemetry( + os, + TelemetryResult::Failed, + Some(reason), + Some(reason_desc), + err.status_code(), + None, + ) + .await; + Err(err.into()) + }, + } + } + async fn spawn(&mut self, os: &mut Os) -> Result<()> { let is_small_screen = self.terminal_width() < GREETING_BREAK_POINT; if os @@ -1062,10 +1137,45 @@ impl ChatSession { /// will fail with [ChatError::CompactHistoryFailure]. async fn compact_history( &mut self, - os: &Os, + os: &mut Os, custom_prompt: Option, show_summary: bool, strategy: CompactStrategy, + ) -> Result { + // Same pattern as is done for handle_response for getting request metadata on sigint. + let request_metadata: Arc>> = Arc::new(Mutex::new(None)); + let request_metadata_clone = Arc::clone(&request_metadata); + let mut ctrl_c_stream = self.ctrlc_rx.resubscribe(); + + tokio::select! { + res = self.compact_history_impl(os, custom_prompt, show_summary, strategy, request_metadata_clone) => res, + Ok(_) = ctrl_c_stream.recv() => { + debug!(?request_metadata, "ctrlc received"); + // Wait for handle_response to finish handling the ctrlc. + tokio::time::sleep(Duration::from_millis(5)).await; + let request_metadata = request_metadata.lock().await.take(); + println!("??? {:?}", request_metadata); + self.send_chat_telemetry( + os, + TelemetryResult::Cancelled, + None, + None, + None, + request_metadata.as_ref(), + ) + .await; + Err(ChatError::Interrupted { tool_uses: Some(self.tool_uses.clone()) }) + } + } + } + + async fn compact_history_impl( + &mut self, + os: &mut Os, + custom_prompt: Option, + show_summary: bool, + strategy: CompactStrategy, + request_metadata_lock: Arc>>, ) -> Result { let hist = self.conversation.history(); debug!(?strategy, ?hist, "compacting history"); @@ -1096,21 +1206,17 @@ impl ChatSession { )?; } - // Send a request for summarizing the history. let summary_state = self .conversation .create_summary_request(os, custom_prompt.as_ref(), strategy) .await?; - execute!(self.stderr, cursor::Hide, style::Print("\n"))?; - if self.interactive { + execute!(self.stderr, cursor::Hide, style::Print("\n"))?; self.spinner = Some(Spinner::new(Spinners::Dots, "Creating summary...".to_string())); } - let response = os.client.send_message(summary_state).await; - - let response = match response { + let mut response = match self.send_message(os, summary_state, request_metadata_lock).await { Ok(res) => res, Err(err) => { if self.interactive { @@ -1123,19 +1229,11 @@ impl ChatSession { )?; } - let (reason, reason_desc) = get_error_reason(&err); - self.send_chat_telemetry( - os, - None, - TelemetryResult::Failed, - Some(reason), - Some(reason_desc), - err.status_code(), - ) - .await; + // If the request fails due to context window overflow, then we'll see if it's + // retryable according to the passed strategy. let history_len = self.conversation.history().len(); match err { - ApiClientError::ContextWindowOverflow { .. } => { + ChatError::Client(err) if matches!(*err, ApiClientError::ContextWindowOverflow { .. }) => { error!(?strategy, "failed to send compaction request"); // If there's only two messages in the history, we have no choice but to // truncate it. We use two messages since it's almost guaranteed to contain: @@ -1178,36 +1276,43 @@ impl ChatSession { return Err(ChatError::CompactHistoryFailure); } }, - err => return Err(err.into()), + err => return Err(err), } }, }; - let request_id = response.request_id().map(|s| s.to_string()); - let summary = { - let mut parser = ResponseParser::new(response); + let (summary, request_metadata) = { loop { - match parser.recv().await { - Ok(parser::ResponseEvent::EndStream { message }) => { - break message.content().to_string(); + match response.recv().await { + Some(Ok(parser::ResponseEvent::EndStream { + message, + request_metadata, + })) => { + break (message.content().to_string(), request_metadata); }, - Ok(_) => (), - Err(err) => { - if let Some(request_id) = &err.request_id { + Some(Ok(_)) => (), + Some(Err(err)) => { + if let Some(request_id) = &err.request_metadata.request_id { self.failed_request_ids.push(request_id.clone()); }; + let (reason, reason_desc) = get_error_reason(&err); self.send_chat_telemetry( os, - err.request_id.clone(), TelemetryResult::Failed, Some(reason), Some(reason_desc), err.status_code(), + Some(&err.request_metadata), ) .await; + return Err(err.into()); }, + None => { + error!("response stream receiver closed before receiving a stop event"); + return Err(ChatError::Custom("Stream failed during compaction".into())); + }, } } }; @@ -1222,11 +1327,18 @@ impl ChatSession { )?; } - self.send_chat_telemetry(os, request_id, TelemetryResult::Succeeded, None, None, None) - .await; - self.conversation - .replace_history_with_summary(summary.clone(), strategy); + .replace_history_with_summary(summary.clone(), strategy, request_metadata.clone()); + + self.send_chat_telemetry( + os, + TelemetryResult::Succeeded, + None, + None, + None, + Some(&request_metadata), + ) + .await; // Print output to the user. { @@ -1287,12 +1399,8 @@ impl ChatSession { // If a next message is set, then retry the request. if self.conversation.next_user_message().is_some() { Ok(ChatState::HandleResponseStream( - os.client - .send_message( - self.conversation - .as_sendable_conversation_state(os, &mut self.stderr, false) - .await?, - ) + self.conversation + .as_sendable_conversation_state(os, &mut self.stderr, false) .await?, )) } else { @@ -1595,9 +1703,7 @@ impl ChatSession { self.spinner = Some(Spinner::new(Spinners::Dots, "Thinking...".to_owned())); } - Ok(ChatState::HandleResponseStream( - os.client.send_message(conv_state).await?, - )) + Ok(ChatState::HandleResponseStream(conv_state)) } } @@ -1787,29 +1893,55 @@ impl ChatSession { self.spinner = Some(Spinner::new(Spinners::Dots, "Thinking...".to_string())); } + self.send_chat_telemetry( + os, + TelemetryResult::Succeeded, + None, + None, + None, + self.current_request_metadata.as_ref(), + ) + .await; self.send_tool_use_telemetry(os).await; return Ok(ChatState::HandleResponseStream( - os.client - .send_message( - self.conversation - .as_sendable_conversation_state(os, &mut self.stderr, false) - .await?, - ) + self.conversation + .as_sendable_conversation_state(os, &mut self.stderr, false) .await?, )); } - async fn handle_response(&mut self, os: &mut Os, response: SendMessageOutput) -> Result { - let request_id = response.request_id().map(|s| s.to_string()); + /// Sends a [crate::api_client::ApiClient::send_message] request to the backend and consumes + /// the response stream. + /// + /// In order to handle sigints while also keeping track of metadata about how the + /// response stream was handled, we need a couple extra parameters: + /// - `request_metadata_lock` - Updated with the [RequestMetadata] once it has been received + /// (either though a successful request, or on an error). + /// - `ctrl_c` - a broadcast receiver for whenever sigints are encountered. + /// - `cancel_token` - a [CancellationToken] to prevent handling the response stream on an + /// error or sigint. + /// + /// The top-level caller is expected to check the value of `request_metadata_lock` when a + /// ctrl+c is sent to get the relevant request metadata. + async fn handle_response( + &mut self, + os: &mut Os, + state: crate::api_client::model::ConversationState, + request_metadata_lock: Arc>>, + ) -> Result { + let mut rx = self.send_message(os, state, request_metadata_lock).await?; + + let request_id = rx.request_id().map(String::from); + let mut buf = String::new(); let mut offset = 0; let mut ended = false; - let mut parser = ResponseParser::new(response); let mut state = ParseState::new(Some(self.terminal_width())); let mut response_prefix_printed = false; let mut tool_uses = Vec::new(); let mut tool_name_being_recvd: Option = None; + let mut request_metadata = None; if self.spinner.is_some() { drop(self.spinner.take()); @@ -1823,8 +1955,8 @@ impl ChatSession { } loop { - match parser.recv().await { - Ok(msg_event) => { + match rx.recv().await { + Some(Ok(msg_event)) => { trace!("Consumed: {:?}", msg_event); match msg_event { parser::ResponseEvent::ToolUseStart { name } => { @@ -1856,37 +1988,41 @@ impl ChatSession { tool_uses.push(tool_use); tool_name_being_recvd = None; }, - parser::ResponseEvent::EndStream { message } => { + parser::ResponseEvent::EndStream { + message, + request_metadata: rm, + } => { // This log is attempting to help debug instances where users encounter // the response timeout message. if message.content() == RESPONSE_TIMEOUT_CONTENT { error!(?request_id, ?message, "Encountered an unexpected model response"); } - self.conversation.push_assistant_message(os, message); + self.conversation.push_assistant_message(os, message, Some(rm.clone())); + request_metadata = Some(rm); ended = true; }, } }, - Err(recv_error) => { - if let Some(request_id) = &recv_error.request_id { + Some(Err(recv_error)) => { + if let Some(request_id) = &recv_error.request_metadata.request_id { self.failed_request_ids.push(request_id.clone()); }; let (reason, reason_desc) = get_error_reason(&recv_error); self.send_chat_telemetry( os, - recv_error.request_id.clone(), TelemetryResult::Failed, Some(reason), Some(reason_desc), recv_error.status_code(), + Some(&recv_error.request_metadata), ) .await; match recv_error.source { RecvErrorKind::StreamTimeout { source, duration } => { error!( - recv_error.request_id, + recv_error.request_metadata.request_id, ?source, "Encountered a stream timeout after waiting for {}s", duration.as_secs() @@ -1900,6 +2036,7 @@ impl ChatSession { self.conversation.push_assistant_message( os, AssistantMessage::new_response(None, RESPONSE_TIMEOUT_CONTENT.to_string()), + None, ); self.conversation .set_next_user_message( @@ -1909,12 +2046,8 @@ impl ChatSession { .await; self.send_tool_use_telemetry(os).await; return Ok(ChatState::HandleResponseStream( - os.client - .send_message( - self.conversation - .as_sendable_conversation_state(os, &mut self.stderr, false) - .await?, - ) + self.conversation + .as_sendable_conversation_state(os, &mut self.stderr, false) .await?, )); }, @@ -1925,10 +2058,11 @@ impl ChatSession { .. } => { error!( - recv_error.request_id, + recv_error.request_metadata.request_id, tool_use_id, name, "The response stream ended before the entire tool use was received" ); - self.conversation.push_assistant_message(os, *message); + self.conversation + .push_assistant_message(os, *message, Some(recv_error.request_metadata)); let tool_results = vec![ToolUseResult { tool_use_id, content: vec![ToolUseResultBlock::Text( @@ -1939,18 +2073,18 @@ impl ChatSession { self.conversation.add_tool_results(tool_results); self.send_tool_use_telemetry(os).await; return Ok(ChatState::HandleResponseStream( - os.client - .send_message( - self.conversation - .as_sendable_conversation_state(os, &mut self.stderr, false) - .await?, - ) + self.conversation + .as_sendable_conversation_state(os, &mut self.stderr, false) .await?, )); }, _ => return Err(recv_error.into()), } }, + None => { + warn!("response stream receiver closed before receiving a stop event"); + ended = true; + }, } // Fix for the markdown parser copied over from q chat: @@ -2000,9 +2134,6 @@ impl ChatSession { } if ended { - self.send_chat_telemetry(os, request_id, TelemetryResult::Succeeded, None, None, None) - .await; - if os .database .settings @@ -2033,18 +2164,31 @@ impl ChatSession { } if !tool_uses.is_empty() { - Ok(ChatState::ValidateTools(tool_uses)) + Ok(ChatState::ValidateTools { + tool_uses, + request_metadata, + }) } else { self.tool_uses.clear(); self.pending_tool_index = None; + // TODO(telem) send recordUserTurnCompletion + // self.send_chat_telemetry(os, request_id, TelemetryResult::Succeeded, None, None, None) + // .await; + // self.send_record_user_turn_completion(); + Ok(ChatState::PromptUser { skip_printing_tools: false, }) } } - async fn validate_tools(&mut self, os: &Os, tool_uses: Vec) -> Result { + async fn validate_tools( + &mut self, + os: &Os, + tool_uses: Vec, + request_metadata: Option, + ) -> Result { let conv_id = self.conversation.conversation_id().to_owned(); debug!(?tool_uses, "Validating tool uses"); let mut queued_tools: Vec = Vec::new(); @@ -2122,7 +2266,17 @@ impl ChatSession { } } } + self.conversation.add_tool_results(tool_results); + self.send_chat_telemetry( + os, + TelemetryResult::Succeeded, + None, + None, + None, + request_metadata.as_ref(), + ) + .await; self.send_tool_use_telemetry(os).await; if let ToolUseStatus::Idle = self.tool_use_status { self.tool_use_status = ToolUseStatus::RetryInProgress( @@ -2132,18 +2286,15 @@ impl ChatSession { ); } - let response = os - .client - .send_message( - self.conversation - .as_sendable_conversation_state(os, &mut self.stderr, false) - .await?, - ) - .await?; - return Ok(ChatState::HandleResponseStream(response)); + return Ok(ChatState::HandleResponseStream( + self.conversation + .as_sendable_conversation_state(os, &mut self.stderr, false) + .await?, + )); } self.tool_uses = queued_tools; + self.current_request_metadata = request_metadata; self.pending_tool_index = Some(0); Ok(ChatState::ExecuteTools) } @@ -2298,28 +2449,52 @@ impl ChatSession { Ok(()) } + /// Sends an "codewhispererterminal_addChatMessage" telemetry event. + /// + /// This *MUST* be called in the following cases: + /// 1. After the end of a user turn + /// 2. After tool use execution has completed + /// 3. After an error was encountered during the handling of the response stream, tool use + /// validation, or tool use execution. + /// + /// Note: whether or not to send telemetry should be derived from the chat history rather than + /// having to be manually invoked, although that would require more substantial changes. #[allow(clippy::too_many_arguments)] async fn send_chat_telemetry( &self, os: &Os, - request_id: Option, result: TelemetryResult, reason: Option, reason_desc: Option, status_code: Option, + md: Option<&RequestMetadata>, ) { + let data = ChatAddedMessage { + request_id: md.and_then(|md| md.request_id.clone()), + message_id: self.conversation.message_id().map(|s| s.to_owned()), + context_file_length: self.conversation.context_message_length(), + model: self.conversation.model.clone(), + reason, + reason_desc, + status_code, + time_to_first_chunk_ms: md.and_then(|md| md.time_to_first_chunk.map(|d| d.as_secs_f64() * 1000.0)), + time_between_chunks_ms: md.map(|md| { + md.time_between_chunks + .iter() + .map(|d| d.as_secs_f64() * 1000.0) + .collect::>() + }), + chat_conversation_type: md.and_then(|md| md.chat_conversation_type), + tool_use_id: self.conversation.latest_tool_use_ids(), + tool_name: self.conversation.latest_tool_use_names(), + assistant_response_length: md.map(|md| md.response_size as i32), + }; os.telemetry .send_chat_added_message( &os.database, self.conversation.conversation_id().to_owned(), - self.conversation.message_id().map(|s| s.to_owned()), - request_id, - self.conversation.context_message_length(), result, - reason, - reason_desc, - status_code, - self.conversation.model.clone(), + data, ) .await .ok(); diff --git a/crates/chat-cli/src/cli/chat/parser.rs b/crates/chat-cli/src/cli/chat/parser.rs index 821ba10876..4ef4f3dc02 100644 --- a/crates/chat-cli/src/cli/chat/parser.rs +++ b/crates/chat-cli/src/cli/chat/parser.rs @@ -1,30 +1,49 @@ +use std::sync::Arc; use std::time::{ Duration, Instant, }; use eyre::Result; +use serde::{ + Deserialize, + Serialize, +}; use thiserror::Error; +use tokio::sync::{ + Mutex, + mpsc, +}; +use tokio_util::sync::CancellationToken; use tracing::{ + debug, error, info, trace, + warn, }; use super::message::{ AssistantMessage, AssistantToolUse, }; -use crate::api_client::model::ChatResponseStream; +use crate::api_client::model::{ + ChatResponseStream, + ConversationState, +}; use crate::api_client::send_message_output::SendMessageOutput; +use crate::api_client::{ + ApiClient, + ApiClientError, +}; use crate::telemetry::ReasonCode; +use crate::telemetry::core::ChatConversationType; #[derive(Debug, Error)] pub struct RecvError { - /// The request id associated with the [SendMessageOutput] stream. - pub request_id: Option, #[source] pub source: RecvErrorKind, + pub request_metadata: RequestMetadata, } impl RecvError { @@ -34,6 +53,7 @@ impl RecvError { RecvErrorKind::Json(_) => None, RecvErrorKind::StreamTimeout { .. } => None, RecvErrorKind::UnexpectedToolUseEos { .. } => None, + RecvErrorKind::Cancelled => None, } } } @@ -45,6 +65,7 @@ impl ReasonCode for RecvError { RecvErrorKind::Json(_) => "RecvErrorJson".to_string(), RecvErrorKind::StreamTimeout { .. } => "RecvErrorStreamTimeout".to_string(), RecvErrorKind::UnexpectedToolUseEos { .. } => "RecvErrorUnexpectedToolUseEos".to_string(), + RecvErrorKind::Cancelled => "Interrupted".to_string(), } } } @@ -52,7 +73,7 @@ impl ReasonCode for RecvError { impl std::fmt::Display for RecvError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Failed to receive the next message: ")?; - if let Some(request_id) = self.request_id.as_ref() { + if let Some(request_id) = self.request_metadata.request_id.as_ref() { write!(f, "request_id: {}, error: ", request_id)?; } write!(f, "{}", self.source)?; @@ -90,6 +111,100 @@ pub enum RecvErrorKind { message: Box, time_elapsed: Duration, }, + /// The stream processing task was cancelled + #[error("Stream handling was cancelled")] + Cancelled, +} + +/// Represents a response stream from a call to the SendMessage API. +/// +/// Send a request using [Self::send_message]. +#[derive(Debug)] +pub struct SendMessageStream { + request_id: Option, + ev_rx: mpsc::Receiver>, + /// Used for graceful cleanup of the stream handler task. Required for setting request metadata + /// on drop (e.g. in the sigint case). + cancel_token: CancellationToken, +} + +impl Drop for SendMessageStream { + fn drop(&mut self) { + self.cancel_token.cancel(); + } +} + +impl SendMessageStream { + /// Sends a SendMessage request to the backend, returning the response stream to consume. + /// + /// You should repeatedly call [Self::recv] to receive [ResponseEvent]'s until a + /// [ResponseEvent::EndStream] value is returned. + /// + /// # Arguments + /// + /// * `client` - api client to make the request with + /// * `conversation_state` - the [crate::api_client::model::ConversationState] to send + /// * `request_metadata_lock` - a mutex that will be updated with metadata about the consumed + /// response stream on stream completion (ie, [ResponseEvent::EndStream] is returned) or on + /// drop. + /// + /// # Details + /// + /// Why `request_metadata_lock`? Because when a sigint occurs, we need to capture how much of + /// the response stream was consumed for telemetry purposes. From the sigint handler, there's + /// no easy way around this currently without a solution that requires global state - hence, a + /// mutex. + /// + /// Internally, [Self::send_message] spawns a new task that will continually consume the + /// response stream which will be cancelled when [Self] is dropped (e.g., when the surrounding + /// future is aborted in the sigint case). The task will gracefully end with updating the mutex + /// with [RequestMetadata]. + pub async fn send_message( + client: &ApiClient, + conversation_state: ConversationState, + request_metadata_lock: Arc>>, + ) -> Result { + let message_id = uuid::Uuid::new_v4().to_string(); + info!(?message_id, "Generated new message id"); + + let cancel_token = CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); + + let start_time = Instant::now(); + debug!(?start_time, "sending send_message request"); + let response = client.send_message(conversation_state).await?; + let elapsed = start_time.elapsed(); + debug!(?elapsed, "send_message succeeded"); + + let request_id = response.request_id().map(str::to_string); + let (ev_tx, ev_rx) = mpsc::channel(16); + tokio::spawn(async move { + ResponseParser::new( + response, + message_id, + ev_tx, + start_time, + cancel_token_clone, + request_metadata_lock, + ) + .try_recv() + .await; + }); + + Ok(Self { + request_id, + cancel_token, + ev_rx, + }) + } + + pub async fn recv(&mut self) -> Option> { + self.ev_rx.recv().await + } + + pub fn request_id(&self) -> Option<&str> { + self.request_id.as_deref() + } } /// State associated with parsing a [ChatResponseStream] into a [Message]. @@ -99,13 +214,18 @@ pub enum RecvErrorKind { /// You should repeatedly call [Self::recv] to receive [ResponseEvent]'s until a /// [ResponseEvent::EndStream] value is returned. #[derive(Debug)] -pub struct ResponseParser { - /// The response to consume and parse into a sequence of [Ev]. +struct ResponseParser { + /// The response to consume and parse into a sequence of [ResponseEvent]. response: SendMessageOutput, - /// Buffer to hold the next event in [SendMessageOutput]. - peek: Option, + event_tx: mpsc::Sender>, + /// Message identifier for the assistant's response. Randomly generated on creation. message_id: String, + + ended: bool, + + /// Buffer to hold the next event in [SendMessageOutput]. + peek: Option, /// Buffer for holding the accumulated assistant response. assistant_text: String, /// Tool uses requested by the model. @@ -113,24 +233,71 @@ pub struct ResponseParser { /// Whether or not we are currently receiving tool use delta events. Tuple of /// `Some((tool_use_id, name))` if true, [None] otherwise. parsing_tool_use: Option<(String, String)>, + + request_metadata: Arc>>, + cancel_token: CancellationToken, + + // metadata fields + /// Time immediately after sending the request. + start_time: Instant, + /// Total size (in bytes) of the response received so far. + received_response_size: usize, + time_to_first_chunk: Option, + time_between_chunks: Vec, } impl ResponseParser { - pub fn new(response: SendMessageOutput) -> Self { - let message_id = uuid::Uuid::new_v4().to_string(); - info!(?message_id, "Generated new message id"); + fn new( + response: SendMessageOutput, + message_id: String, + event_tx: mpsc::Sender>, + start_time: Instant, + cancel_token: CancellationToken, + request_metadata: Arc>>, + ) -> Self { Self { response, - peek: None, message_id, + ended: false, + event_tx, + peek: None, assistant_text: String::new(), tool_uses: Vec::new(), parsing_tool_use: None, + start_time, + received_response_size: 0, + time_to_first_chunk: None, + time_between_chunks: Vec::new(), + request_metadata, + cancel_token, + } + } + + async fn try_recv(&mut self) { + loop { + if self.ended { + trace!("response stream has ended"); + return; + } + + let cancel_token = self.cancel_token.clone(); + tokio::select! { + res = self.recv() => { + let _ = self.event_tx.send(res).await.map_err(|err| error!(?err, "failed to send event to channel")); + }, + _ = cancel_token.cancelled() => { + debug!("response parser was cancelled"); + let err = self.error(RecvErrorKind::Cancelled); + *self.request_metadata.lock().await = Some(err.request_metadata.clone()); + let _ = self.event_tx.send(Err(err)).await.map_err(|err| error!(?err, "failed to send error to channel")); + return; + }, + } } } /// Consumes the associated [ConverseStreamResponse] until a valid [ResponseEvent] is parsed. - pub async fn recv(&mut self) -> Result { + async fn recv(&mut self) -> Result { if let Some((id, name)) = self.parsing_tool_use.take() { let tool_use = self.parse_tool_use(id, name).await?; self.tool_uses.push(tool_use.clone()); @@ -182,16 +349,28 @@ impl ResponseParser { Ok(None) => { let message_id = Some(self.message_id.clone()); let content = std::mem::take(&mut self.assistant_text); - let message = if self.tool_uses.is_empty() { - AssistantMessage::new_response(message_id, content) + let (message, conv_type) = if self.tool_uses.is_empty() { + ( + AssistantMessage::new_response(message_id, content), + ChatConversationType::NotToolUse, + ) } else { - AssistantMessage::new_tool_use( - message_id, - content, - self.tool_uses.clone().into_iter().collect(), + ( + AssistantMessage::new_tool_use( + message_id, + content, + self.tool_uses.clone().into_iter().collect(), + ), + ChatConversationType::ToolUse, ) }; - return Ok(ResponseEvent::EndStream { message }); + let request_metadata = self.make_metadata(Some(conv_type)); + *self.request_metadata.lock().await = Some(request_metadata.clone()); + self.ended = true; + return Ok(ResponseEvent::EndStream { + message, + request_metadata, + }); }, Err(err) => return Err(err), } @@ -297,11 +476,31 @@ impl ResponseParser { let result = self.response.recv().await; let duration = std::time::Instant::now().duration_since(start); match result { - Ok(r) => { - trace!(?r, "Received new event"); - Ok(r) + Ok(ev) => { + trace!(?ev, "Received new event"); + + // Track metadata about the chunk. + self.time_to_first_chunk + .get_or_insert_with(|| self.start_time.elapsed()); + self.time_between_chunks.push(duration); + if let Some(r) = ev.as_ref() { + match r { + ChatResponseStream::AssistantResponseEvent { content } => { + self.received_response_size += content.len(); + }, + ChatResponseStream::ToolUseEvent { input, .. } => { + self.received_response_size += input.as_ref().map(String::len).unwrap_or_default(); + }, + _ => { + warn!(?r, "received unexpected event from the response stream"); + }, + } + } + + Ok(ev) }, Err(err) => { + error!(?err, "failed to receive the next event"); if duration.as_secs() >= 59 { Err(self.error(RecvErrorKind::StreamTimeout { source: err, duration })) } else { @@ -318,8 +517,18 @@ impl ResponseParser { /// Helper to create a new [RecvError] populated with the associated request id for the stream. fn error(&self, source: impl Into) -> RecvError { RecvError { - request_id: self.request_id().map(str::to_string), source: source.into(), + request_metadata: self.make_metadata(None), + } + } + + fn make_metadata(&self, chat_conversation_type: Option) -> RequestMetadata { + RequestMetadata { + request_id: self.request_id().map(String::from), + time_to_first_chunk: self.time_to_first_chunk, + time_between_chunks: self.time_between_chunks.clone(), + response_size: self.received_response_size, + chat_conversation_type, } } } @@ -339,9 +548,26 @@ pub enum ResponseEvent { /// previously emitted. This should be stored in the conversation history and sent in /// subsequent requests. message: AssistantMessage, + /// Metadata for the request stream. + request_metadata: RequestMetadata, }, } +/// Metadata about the sent request and associated response stream. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RequestMetadata { + /// The request id associated with the [SendMessageOutput] stream. + pub request_id: Option, + /// Time until the first chunk was received. + pub time_to_first_chunk: Option, + /// Time between each received chunk in the stream. + pub time_between_chunks: Vec, + /// Total size (in bytes) of the response. + pub response_size: usize, + /// [ChatConversationType] for the returned assistant message. + pub chat_conversation_type: Option, +} + #[cfg(test)] mod tests { use super::*; @@ -394,7 +620,14 @@ mod tests { ]; events.reverse(); let mock = SendMessageOutput::Mock(events); - let mut parser = ResponseParser::new(mock); + let mut parser = ResponseParser::new( + mock, + "".to_string(), + mpsc::channel(32).0, + Instant::now(), + CancellationToken::new(), + Arc::new(Mutex::new(None)), + ); for _ in 0..5 { println!("{:?}", parser.recv().await.unwrap()); diff --git a/crates/chat-cli/src/telemetry/core.rs b/crates/chat-cli/src/telemetry/core.rs index b5758ef04d..2758fa2095 100644 --- a/crates/chat-cli/src/telemetry/core.rs +++ b/crates/chat-cli/src/telemetry/core.rs @@ -7,6 +7,7 @@ use strum::{ EnumString, }; +use super::definitions::types::CodewhispererterminalChatConversationType; use crate::telemetry::definitions::IntoMetricDatum; use crate::telemetry::definitions::metrics::{ AmazonqDidSelectProfile, @@ -149,15 +150,23 @@ impl Event { ), EventType::ChatAddedMessage { conversation_id, - context_file_length, - message_id, - request_id, result, - reason, - reason_desc, - status_code, - model, - .. + data: + ChatAddedMessage { + context_file_length, + message_id, + request_id, + reason, + reason_desc, + status_code, + model, + time_to_first_chunk_ms, + time_between_chunks_ms, + chat_conversation_type, + tool_name, + tool_use_id, + assistant_response_length, + }, } => Some( CodewhispererterminalAddChatMessage { create_time: self.created_time, @@ -174,6 +183,18 @@ impl Event { reason_desc: reason_desc.map(Into::into), status_code: status_code.map(|v| v as i64).map(Into::into), codewhispererterminal_model: model.map(Into::into), + codewhispererterminal_time_to_first_chunk_ms: time_to_first_chunk_ms + .map(|v| v as i64) + .map(Into::into), + codewhispererterminal_time_between_chunks_ms: time_between_chunks_ms + .map(|v| v.iter().map(|v| format!("{:.3}", v)).collect::>().join(",")) + .map(Into::into), + codewhispererterminal_chat_conversation_type: chat_conversation_type.map(Into::into), + codewhispererterminal_tool_name: tool_name.map(Into::into), + codewhispererterminal_tool_use_id: tool_use_id.map(Into::into), + codewhispererterminal_assistant_response_length: assistant_response_length + .map(|v| v as i64) + .map(Into::into), } .into_metric_datum(), ), @@ -212,6 +233,12 @@ impl Event { codewhispererterminal_custom_tool_latency: custom_tool_call_latency .map(|l| CodewhispererterminalCustomToolLatency(l as i64)), codewhispererterminal_model: model.map(Into::into), + // codewhispererterminal_is_tool_use_trusted: todo!(), + // codewhispererterminal_tool_execution_duration_ms: todo!(), + // codewhispererterminal_tool_turn_duration_ms: todo!(), + codewhispererterminal_is_tool_use_trusted: None, + codewhispererterminal_tool_execution_duration_ms: None, + codewhispererterminal_tool_turn_duration_ms: None, } .into_metric_datum(), ), @@ -295,6 +322,40 @@ impl Event { } } +#[derive(Debug, Copy, Clone, PartialEq, Eq, EnumString, Display, serde::Serialize, serde::Deserialize)] +pub enum ChatConversationType { + // Names are as requested by science + NotToolUse, + ToolUse, +} + +impl From for CodewhispererterminalChatConversationType { + fn from(value: ChatConversationType) -> Self { + match value { + ChatConversationType::NotToolUse => Self::EndTurn, + ChatConversationType::ToolUse => Self::ToolUse, + } + } +} + +/// Optional fields to add for a chatAddedMessage telemetry event. +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, Default)] +pub struct ChatAddedMessage { + pub message_id: Option, + pub request_id: Option, + pub context_file_length: Option, + pub reason: Option, + pub reason_desc: Option, + pub status_code: Option, + pub model: Option, + pub time_to_first_chunk_ms: Option, + pub time_between_chunks_ms: Option>, + pub chat_conversation_type: Option, + pub tool_name: Option, + pub tool_use_id: Option, + pub assistant_response_length: Option, +} + #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] #[serde(tag = "type")] @@ -326,14 +387,8 @@ pub enum EventType { }, ChatAddedMessage { conversation_id: String, - message_id: Option, - request_id: Option, - context_file_length: Option, result: TelemetryResult, - reason: Option, - reason_desc: Option, - status_code: Option, - model: Option, + data: ChatAddedMessage, }, ToolUseSuggested { conversation_id: String, diff --git a/crates/chat-cli/src/telemetry/definitions.rs b/crates/chat-cli/src/telemetry/definitions.rs index 4df4325a12..b229dcbe2f 100644 --- a/crates/chat-cli/src/telemetry/definitions.rs +++ b/crates/chat-cli/src/telemetry/definitions.rs @@ -13,6 +13,7 @@ mod tests { use std::time::SystemTime; use super::*; + use crate::telemetry::core::ChatConversationType; use crate::telemetry::definitions::metrics::CodewhispererterminalAddChatMessage; #[test] @@ -32,6 +33,12 @@ mod tests { reason_desc: None, status_code: None, codewhispererterminal_model: None, + codewhispererterminal_time_to_first_chunk_ms: Some(40.into()), + codewhispererterminal_time_between_chunks_ms: Some("1,2,3".to_string().into()), + codewhispererterminal_chat_conversation_type: Some(ChatConversationType::NotToolUse.into()), + codewhispererterminal_tool_use_id: None, + codewhispererterminal_tool_name: None, + codewhispererterminal_assistant_response_length: Some(20.into()), }); let s = serde_json::to_string_pretty(&metric_datum_init).unwrap(); diff --git a/crates/chat-cli/src/telemetry/mod.rs b/crates/chat-cli/src/telemetry/mod.rs index 513826a8fa..f92138d7cd 100644 --- a/crates/chat-cli/src/telemetry/mod.rs +++ b/crates/chat-cli/src/telemetry/mod.rs @@ -4,7 +4,10 @@ pub mod definitions; pub mod endpoint; mod install_method; -use core::ToolUseEventBuilder; +use core::{ + ChatAddedMessage, + ToolUseEventBuilder, +}; use std::str::FromStr; use amzn_codewhisperer_client::types::{ @@ -257,25 +260,13 @@ impl TelemetryThread { &self, database: &Database, conversation_id: String, - message_id: Option, - request_id: Option, - context_file_length: Option, result: TelemetryResult, - reason: Option, - reason_desc: Option, - status_code: Option, - model: Option, + data: ChatAddedMessage, ) -> Result<(), TelemetryError> { let mut event = Event::new(EventType::ChatAddedMessage { conversation_id, - message_id, - request_id, - context_file_length, result, - reason, - reason_desc, - status_code, - model, + data, }); set_start_url_and_region(database, &mut event).await; @@ -473,8 +464,15 @@ impl TelemetryClient { if let EventType::ChatAddedMessage { conversation_id, - message_id, - model, + data: + ChatAddedMessage { + message_id, + model, + time_to_first_chunk_ms, + time_between_chunks_ms, + assistant_response_length, + .. + }, .. } = &event.ty { @@ -483,6 +481,9 @@ impl TelemetryClient { let chat_add_message_event = match ChatAddMessageEvent::builder() .conversation_id(conversation_id) .message_id(message_id.clone().unwrap_or("not_set".to_string())) + .set_time_to_first_chunk_milliseconds(*time_to_first_chunk_ms) + .set_time_between_chunks(time_between_chunks_ms.as_ref().map(|v| v.clone())) + .set_response_length(*assistant_response_length) .build() { Ok(event) => event, @@ -653,14 +654,12 @@ mod test { .send_chat_added_message( &database, "conv_id".to_owned(), - Some("message_id".to_owned()), - Some("req_id".to_owned()), - Some(123), TelemetryResult::Succeeded, - None, - None, - None, - None, + ChatAddedMessage { + message_id: Some("message_id".to_owned()), + context_file_length: Some(123), + ..Default::default() + }, ) .await .ok(); diff --git a/crates/chat-cli/telemetry_definitions.json b/crates/chat-cli/telemetry_definitions.json index 1e34f10891..94463c3ea2 100644 --- a/crates/chat-cli/telemetry_definitions.json +++ b/crates/chat-cli/telemetry_definitions.json @@ -88,12 +88,12 @@ { "name": "codewhispererterminal_toolUseId", "type": "string", - "description": "The id assigned to the client by the model representing a tool use event" + "description": "Comma-delimited id(s) of the tool uses requested by the model." }, { "name": "codewhispererterminal_toolName", "type": "string", - "description": "The name associated with a tool" + "description": "Comma-delimited tool name(s) of the tool uses requested by the model." }, { "name": "codewhispererterminal_isToolUseAccepted", @@ -108,7 +108,7 @@ { "name": "codewhispererterminal_utteranceId", "type": "string", - "description": "Id associated with a given response from the model" + "description": "Id associated with a given response from the model. If multiple responses are included, then the id's are comma-delimited in sequential order." }, { "name": "codewhispererterminal_userInputId", @@ -155,10 +155,59 @@ "type": "int", "description": "Custom tool call latency in seconds" }, + { + "name": "codewhispererterminal_toolExecutionDurationMs", + "type": "int", + "description": "The amount of time taken to execute a tool. In milliseconds (ms)." + }, + { + "name": "codewhispererterminal_toolTurnDurationMs", + "type": "int", + "description": "Total amount of time the tool turn took, including waiting for user input if required. In milliseconds (ms)." + }, + { + "name": "codewhispererterminal_isToolUseTrusted", + "type": "int", + "description": "The amount of time taken to execute a tool. In milliseconds (ms)." + }, { "name": "codewhispererterminal_model", "type": "string", "description": "The underlying LLM used by the service, set by the client" + }, + { + "name": "codewhispererterminal_timeToFirstChunkMs", + "type": "int", + "description": "Time from sending the request to reading the first chunk in the stream. In milliseconds (ms)." + }, + { + "name": "codewhispererterminal_timeBetweenChunksMs", + "type": "string", + "description": "Comma-delimited times between reading each chunk, starting from the first chunk. In milliseconds (ms)." + }, + { + "name": "codewhispererterminal_chatConversationType", + "type": "string", + "allowedValues": [ + "EndTurn", + "ToolUse" + ], + "description": "Identifies the role of the message in the conversation." + }, + { + "name": "codewhispererterminal_assistantResponseLength", + "type": "int", + "description": "Total length of the assistant response, in bytes. When used to represent multiple responses (e.g. when recording a user turn), then this is the total length of all assistant responses." + }, + { + "name": "codewhispererterminal_followUpCount", + "type": "int", + "description": "Number of cycles in the user turn (ie, number of tool use and tool use result pairs)." + }, + { + "name": "codewhispererterminal_userTurnDurationSeconds", + "type": "int", + "description": "Total time spent in the user turn, starting from when the first request is sent." } ], "metrics": [ @@ -175,6 +224,30 @@ { "name": "codewhispererterminal_addChatMessage", "description": "Captures active usage with Q Chat in shell", + "metadata": [ + { "type": "amazonqConversationId" }, + { "type": "codewhispererterminal_utteranceId" }, + { "type": "credentialStartUrl", "required": false }, + { "type": "ssoRegion", "required": false }, + { "type": "codewhispererterminal_inCloudshell" }, + { "type": "codewhispererterminal_contextFileLength", "required": false }, + { "type": "requestId" }, + { "type": "result", "required": true }, + { "type": "reason", "required": false }, + { "type": "reasonDesc", "required": false }, + { "type": "statusCode", "required": false }, + { "type": "codewhispererterminal_model" }, + { "type": "codewhispererterminal_timeToFirstChunkMs", "required": false }, + { "type": "codewhispererterminal_timeBetweenChunksMs", "required": false }, + { "type": "codewhispererterminal_chatConversationType", "required": false }, + { "type": "codewhispererterminal_toolUseId", "required": false }, + { "type": "codewhispererterminal_toolName", "required": false }, + { "type": "codewhispererterminal_assistantResponseLength", "required": false } + ] + }, + { + "name": "codewhispererterminal_recordUserTurnCompletion", + "description": "Captures information regarding a user turn (the chain of messages beginning with a user prompt, and ending with either an assistant stop event, or a tool use denial). Emitted once at the end of every user turn.", "metadata": [ { "type": "amazonqConversationId" }, { "type": "codewhispererterminal_utteranceId" }, @@ -271,7 +344,10 @@ "required": false }, { "type": "codewhispererterminal_customToolLatency", "required": false }, - { "type": "codewhispererterminal_model" } + { "type": "codewhispererterminal_model" }, + { "type": "codewhispererterminal_toolExecutionDurationMs", "required": false }, + { "type": "codewhispererterminal_toolTurnDurationMs", "required": false }, + { "type": "codewhispererterminal_isToolUseTrusted", "required": false } ] }, { From cd0180024f075b709fd92153a4e74b57eb08b8f4 Mon Sep 17 00:00:00 2001 From: Brandon Kiser Date: Tue, 15 Jul 2025 17:56:29 -0700 Subject: [PATCH 2/3] fix lints --- crates/chat-cli/src/cli/chat/mod.rs | 10 +++++----- crates/chat-cli/src/cli/chat/parser.rs | 4 ++-- crates/chat-cli/src/telemetry/core.rs | 2 +- crates/chat-cli/src/telemetry/mod.rs | 2 +- crates/chat-cli/telemetry_definitions.json | 2 +- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/crates/chat-cli/src/cli/chat/mod.rs b/crates/chat-cli/src/cli/chat/mod.rs index 86d7ce1597..771619112d 100644 --- a/crates/chat-cli/src/cli/chat/mod.rs +++ b/crates/chat-cli/src/cli/chat/mod.rs @@ -1915,11 +1915,11 @@ impl ChatSession { /// /// In order to handle sigints while also keeping track of metadata about how the /// response stream was handled, we need a couple extra parameters: - /// - `request_metadata_lock` - Updated with the [RequestMetadata] once it has been received - /// (either though a successful request, or on an error). - /// - `ctrl_c` - a broadcast receiver for whenever sigints are encountered. - /// - `cancel_token` - a [CancellationToken] to prevent handling the response stream on an - /// error or sigint. + /// * `request_metadata_lock` - Updated with the [RequestMetadata] once it has been received + /// (either though a successful request, or on an error). + /// * `ctrl_c` - a broadcast receiver for whenever sigints are encountered. + /// * `cancel_token` - a [CancellationToken] to prevent handling the response stream on an error + /// or sigint. /// /// The top-level caller is expected to check the value of `request_metadata_lock` when a /// ctrl+c is sent to get the relevant request metadata. diff --git a/crates/chat-cli/src/cli/chat/parser.rs b/crates/chat-cli/src/cli/chat/parser.rs index 4ef4f3dc02..0b82730fc0 100644 --- a/crates/chat-cli/src/cli/chat/parser.rs +++ b/crates/chat-cli/src/cli/chat/parser.rs @@ -145,8 +145,8 @@ impl SendMessageStream { /// * `client` - api client to make the request with /// * `conversation_state` - the [crate::api_client::model::ConversationState] to send /// * `request_metadata_lock` - a mutex that will be updated with metadata about the consumed - /// response stream on stream completion (ie, [ResponseEvent::EndStream] is returned) or on - /// drop. + /// response stream on stream completion (ie, [ResponseEvent::EndStream] is returned) or on + /// drop. /// /// # Details /// diff --git a/crates/chat-cli/src/telemetry/core.rs b/crates/chat-cli/src/telemetry/core.rs index 2758fa2095..691ec0894c 100644 --- a/crates/chat-cli/src/telemetry/core.rs +++ b/crates/chat-cli/src/telemetry/core.rs @@ -332,7 +332,7 @@ pub enum ChatConversationType { impl From for CodewhispererterminalChatConversationType { fn from(value: ChatConversationType) -> Self { match value { - ChatConversationType::NotToolUse => Self::EndTurn, + ChatConversationType::NotToolUse => Self::NotToolUse, ChatConversationType::ToolUse => Self::ToolUse, } } diff --git a/crates/chat-cli/src/telemetry/mod.rs b/crates/chat-cli/src/telemetry/mod.rs index f92138d7cd..62d98696a0 100644 --- a/crates/chat-cli/src/telemetry/mod.rs +++ b/crates/chat-cli/src/telemetry/mod.rs @@ -482,7 +482,7 @@ impl TelemetryClient { .conversation_id(conversation_id) .message_id(message_id.clone().unwrap_or("not_set".to_string())) .set_time_to_first_chunk_milliseconds(*time_to_first_chunk_ms) - .set_time_between_chunks(time_between_chunks_ms.as_ref().map(|v| v.clone())) + .set_time_between_chunks(time_between_chunks_ms.clone()) .set_response_length(*assistant_response_length) .build() { diff --git a/crates/chat-cli/telemetry_definitions.json b/crates/chat-cli/telemetry_definitions.json index 94463c3ea2..f859500525 100644 --- a/crates/chat-cli/telemetry_definitions.json +++ b/crates/chat-cli/telemetry_definitions.json @@ -189,7 +189,7 @@ "name": "codewhispererterminal_chatConversationType", "type": "string", "allowedValues": [ - "EndTurn", + "NotToolUse", "ToolUse" ], "description": "Identifies the role of the message in the conversation." From f8c0608efac293dde4ae4cd040a65b25f3524cbc Mon Sep 17 00:00:00 2001 From: Brandon Kiser Date: Wed, 16 Jul 2025 09:29:18 -0700 Subject: [PATCH 3/3] cleanup --- crates/chat-cli/src/cli/chat/conversation.rs | 1 - crates/chat-cli/src/cli/chat/mod.rs | 16 ++++------------ crates/chat-cli/src/telemetry/core.rs | 6 +++--- crates/chat-cli/src/telemetry/mod.rs | 8 ++++---- 4 files changed, 11 insertions(+), 20 deletions(-) diff --git a/crates/chat-cli/src/cli/chat/conversation.rs b/crates/chat-cli/src/cli/chat/conversation.rs index 63f0940903..9704a08522 100644 --- a/crates/chat-cli/src/cli/chat/conversation.rs +++ b/crates/chat-cli/src/cli/chat/conversation.rs @@ -848,7 +848,6 @@ fn enforce_conversation_invariants( // If the last message from the assistant contains tool uses AND next_message is set, we need to // ensure that next_message contains tool results. - // if let (Some((_, AssistantMessage::ToolUse { tool_uses, .. }, _)), Some(user_msg)) = ( if let ( Some(HistoryEntry { assistant: AssistantMessage::ToolUse { tool_uses, .. }, diff --git a/crates/chat-cli/src/cli/chat/mod.rs b/crates/chat-cli/src/cli/chat/mod.rs index 771619112d..23e33590fa 100644 --- a/crates/chat-cli/src/cli/chat/mod.rs +++ b/crates/chat-cli/src/cli/chat/mod.rs @@ -137,7 +137,7 @@ use crate::database::settings::Setting; use crate::mcp_client::Prompt; use crate::os::Os; use crate::telemetry::core::{ - ChatAddedMessage, + ChatAddedMessageParams, ToolUseEventBuilder, }; use crate::telemetry::{ @@ -991,7 +991,6 @@ pub enum ChatState { ExecuteTools, /// Consume the response stream and display to the user. HandleResponseStream(crate::api_client::model::ConversationState), - // HandleResponseStream(SendMessageOutput), /// Compact the chat history. CompactHistory { /// Custom prompt to include as part of history compaction. @@ -1150,11 +1149,10 @@ impl ChatSession { tokio::select! { res = self.compact_history_impl(os, custom_prompt, show_summary, strategy, request_metadata_clone) => res, Ok(_) = ctrl_c_stream.recv() => { - debug!(?request_metadata, "ctrlc received"); + debug!(?request_metadata, "ctrlc received in compact history"); // Wait for handle_response to finish handling the ctrlc. tokio::time::sleep(Duration::from_millis(5)).await; let request_metadata = request_metadata.lock().await.take(); - println!("??? {:?}", request_metadata); self.send_chat_telemetry( os, TelemetryResult::Cancelled, @@ -1914,15 +1912,9 @@ impl ChatSession { /// the response stream. /// /// In order to handle sigints while also keeping track of metadata about how the - /// response stream was handled, we need a couple extra parameters: + /// response stream was handled, we need an extra parameter: /// * `request_metadata_lock` - Updated with the [RequestMetadata] once it has been received /// (either though a successful request, or on an error). - /// * `ctrl_c` - a broadcast receiver for whenever sigints are encountered. - /// * `cancel_token` - a [CancellationToken] to prevent handling the response stream on an error - /// or sigint. - /// - /// The top-level caller is expected to check the value of `request_metadata_lock` when a - /// ctrl+c is sent to get the relevant request metadata. async fn handle_response( &mut self, os: &mut Os, @@ -2469,7 +2461,7 @@ impl ChatSession { status_code: Option, md: Option<&RequestMetadata>, ) { - let data = ChatAddedMessage { + let data = ChatAddedMessageParams { request_id: md.and_then(|md| md.request_id.clone()), message_id: self.conversation.message_id().map(|s| s.to_owned()), context_file_length: self.conversation.context_message_length(), diff --git a/crates/chat-cli/src/telemetry/core.rs b/crates/chat-cli/src/telemetry/core.rs index 691ec0894c..0d8ec5c4c2 100644 --- a/crates/chat-cli/src/telemetry/core.rs +++ b/crates/chat-cli/src/telemetry/core.rs @@ -152,7 +152,7 @@ impl Event { conversation_id, result, data: - ChatAddedMessage { + ChatAddedMessageParams { context_file_length, message_id, request_id, @@ -340,7 +340,7 @@ impl From for CodewhispererterminalChatConversationType { /// Optional fields to add for a chatAddedMessage telemetry event. #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, Default)] -pub struct ChatAddedMessage { +pub struct ChatAddedMessageParams { pub message_id: Option, pub request_id: Option, pub context_file_length: Option, @@ -388,7 +388,7 @@ pub enum EventType { ChatAddedMessage { conversation_id: String, result: TelemetryResult, - data: ChatAddedMessage, + data: ChatAddedMessageParams, }, ToolUseSuggested { conversation_id: String, diff --git a/crates/chat-cli/src/telemetry/mod.rs b/crates/chat-cli/src/telemetry/mod.rs index 62d98696a0..f842b3b1b0 100644 --- a/crates/chat-cli/src/telemetry/mod.rs +++ b/crates/chat-cli/src/telemetry/mod.rs @@ -5,7 +5,7 @@ pub mod endpoint; mod install_method; use core::{ - ChatAddedMessage, + ChatAddedMessageParams, ToolUseEventBuilder, }; use std::str::FromStr; @@ -261,7 +261,7 @@ impl TelemetryThread { database: &Database, conversation_id: String, result: TelemetryResult, - data: ChatAddedMessage, + data: ChatAddedMessageParams, ) -> Result<(), TelemetryError> { let mut event = Event::new(EventType::ChatAddedMessage { conversation_id, @@ -465,7 +465,7 @@ impl TelemetryClient { if let EventType::ChatAddedMessage { conversation_id, data: - ChatAddedMessage { + ChatAddedMessageParams { message_id, model, time_to_first_chunk_ms, @@ -655,7 +655,7 @@ mod test { &database, "conv_id".to_owned(), TelemetryResult::Succeeded, - ChatAddedMessage { + ChatAddedMessageParams { message_id: Some("message_id".to_owned()), context_file_length: Some(123), ..Default::default()