diff --git a/Cargo.lock b/Cargo.lock index c1f12953..5f4c4dfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2022,6 +2022,8 @@ dependencies = [ "ordered-float 5.0.0", "tempfile", "tokio", + "tracing", + "tracing-test", "trait-variant", ] @@ -2033,6 +2035,8 @@ dependencies = [ "colored", "optd", "tokio", + "tracing", + "tracing-subscriber", ] [[package]] @@ -3254,9 +3258,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "tracing-core" version = "0.1.33" @@ -3296,6 +3312,27 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn 2.0.101", +] + [[package]] name = "trait-variant" version = "0.1.2" diff --git a/optd-cli/Cargo.toml b/optd-cli/Cargo.toml index 237a56a8..13b138b7 100644 --- a/optd-cli/Cargo.toml +++ b/optd-cli/Cargo.toml @@ -10,3 +10,5 @@ optd = { path = "../optd" } clap = { version = "4.5.38", features = ["derive"] } colored = "3.0.0" tokio = "1.45.0" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "ansi"] } diff --git a/optd-cli/src/main.rs b/optd-cli/src/main.rs index 8b9cbccf..383e5c19 100644 --- a/optd-cli/src/main.rs +++ b/optd-cli/src/main.rs @@ -44,6 +44,8 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::runtime::Builder; use tokio::task::JoinSet; +use tracing::{Instrument, instrument}; +use tracing_subscriber::{EnvFilter, fmt}; #[derive(Parser)] #[command( @@ -66,10 +68,20 @@ enum Commands { } fn main() -> Result<(), Vec> { + // Initialize tracing subscriber + let filter = EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new("optd=info,optd::cli=info")); + fmt() + .with_env_filter(filter) + .pretty() + .with_ansi(true) + .init(); + + tracing::info!(target: "optd::cli", "optd-cli starting up"); let cli = Cli::parse(); let mut udfs = HashMap::new(); - let udf = Udf { + let unimplemented_udf = Udf { func: Arc::new(|_, _, _| { Box::pin(async move { println!("This user-defined function is unimplemented!"); @@ -77,24 +89,32 @@ fn main() -> Result<(), Vec> { }) }), }; - udfs.insert("unimplemented_udf".to_string(), udf.clone()); + udfs.insert("unimplemented_udf".to_string(), unimplemented_udf.clone()); match cli.command { Commands::Compile(config) => { + let compile_span = + tracing::info_span!(target: "optd::cli", "compile_file", path = %config.path_str()); + let _guard = compile_span.enter(); + tracing::info!("Starting compilation"); for mock_udf in config.mock_udfs() { - udfs.insert(mock_udf.to_string(), udf.clone()); + udfs.insert(mock_udf.to_string(), unimplemented_udf.clone()); } let _ = compile_hir(config, udfs).unwrap_or_else(|errors| handle_errors(&errors)); + tracing::info!("Compilation completed successfully"); Ok(()) } Commands::RunFunctions(config) => { - // TODO(Connor): Add support for running functions with real UDFs. + let run_span = tracing::info_span!(target: "optd::cli", "run_dsl_functions", path = %config.path_str()); + let _guard = run_span.enter(); + tracing::info!("Starting function execution"); for mock_udf in config.mock_udfs() { - udfs.insert(mock_udf.to_string(), udf.clone()); + udfs.insert(mock_udf.to_string(), unimplemented_udf.clone()); } let hir = compile_hir(config, udfs).unwrap_or_else(|errors| handle_errors(&errors)); + tracing::info!("Compilation successful, proceeding to run functions"); run_all_functions(&hir) } @@ -108,17 +128,25 @@ struct FunctionResult { } /// Run all functions found in the HIR, marked with [run]. +#[instrument( + level = "info", + skip(hir), + target = "optd::cli", + name = "run_all_functions" +)] fn run_all_functions(hir: &HIR) -> Result<(), Vec> { println!("\n{} {}\n", "•".green(), "Running functions...".green()); - let functions = find_functions(hir); + let functions_to_run = find_functions(hir); + tracing::info!(target: "optd::cli", num_functions = functions_to_run.len(), "Found functions to run"); - if functions.is_empty() { + if functions_to_run.is_empty() { println!("No functions found annotated with [run]"); + tracing::warn!(target: "optd::cli", "No functions found annotated with [run]"); return Ok(()); } - println!("Found {} functions to run", functions.len()); + println!("Found {} functions to run", functions_to_run.len()); // Create a multi-threaded runtime for parallel execution. // TODO: We increase the stack size by x64 to avoid stack overflow @@ -128,7 +156,8 @@ fn run_all_functions(hir: &HIR) -> Result<(), Vec> { .enable_all() .build() .unwrap(); - let function_results = runtime.block_on(run_functions_in_parallel(hir, functions)); + tracing::debug!(target: "optd::cli", "Tokio runtime initialized for function execution"); + let function_results = runtime.block_on(run_functions_in_parallel(hir, functions_to_run)); // Process and display function results. let success_count = process_function_results(function_results); @@ -138,6 +167,7 @@ fn run_all_functions(hir: &HIR) -> Result<(), Vec> { "Execution Results:".yellow(), format!("{} functions executed", success_count).yellow() ); + tracing::info!(target: "optd::cli", success_count, "Function execution finished"); Ok(()) } @@ -147,25 +177,36 @@ async fn run_functions_in_parallel(hir: &HIR, functions: Vec) -> Vec = - Arc::new(|value| Box::pin(async move { value })); - - // Launch the function with an empty vector of arguments. - let result = engine.launch(&name, vec![], result_handler).await; - FunctionResult { name, result } - }); + set.spawn( + async move { + // Create a continuation that returns itself. + let result_handler: Continuation = + Arc::new(|value| Box::pin(async move { value })); + + tracing::debug!(target: "optd::cli", function_name = %name, "Launching function"); + // Launch the function with an empty vector of arguments. + let result = engine.launch(&name, vec![], result_handler).await; + tracing::debug!(target: "optd::cli", function_name = %name, "Function launch completed"); + FunctionResult { name, result } + } + .instrument(tracing::info_span!(target: "optd::cli", "run_dsl_function", function_name = %function_name)), + ); } // Collect all function results. let mut results = Vec::new(); while let Some(result) = set.join_next().await { if let Ok(function_result) = result { + tracing::debug!(target: "optd::cli", function_name = %function_result.name, "Function task completed"); results.push(function_result); } } @@ -174,20 +215,24 @@ async fn run_functions_in_parallel(hir: &HIR, functions: Vec) -> Vec) -> usize { let mut success_count = 0; for function_result in function_results { - println!("\n{} {}", "Function:".blue(), function_result.name); + println!("\n{} {}", "Function:".blue(), &function_result.name); match function_result.result { EngineResponse::Return(value, _) => { + tracing::trace!(target: "optd::cli", function_name = %function_result.name, "Function returned a value"); // Check if the result is a failure. if matches!(value.data, CoreData::Fail(_)) { + tracing::warn!(target: "optd::cli", function_name = %function_result.name, "Function failed: {}", value); println!(" {}: Function failed: {}", "Error".red(), value); } else { println!(" {}: {}", "Result".green(), value); success_count += 1; + tracing::debug!(target: "optd::cli", function_name = %function_result.name, "Function succeeded. Result: {}", value); } } _ => unreachable!(), // For now, unless we add a special UDF that builds a group / goal. @@ -214,6 +259,7 @@ fn find_functions(hir: &HIR) -> Vec { /// Display error details and exit the program. fn handle_errors(errors: &[CompileError]) -> ! { + tracing::error!(target: "optd::cli", num_errors = errors.len(), "Compilation failed"); eprintln!( "\n{} {}\n", "•".yellow(), diff --git a/optd/Cargo.toml b/optd/Cargo.toml index 275a5ef4..49d93e92 100644 --- a/optd/Cargo.toml +++ b/optd/Cargo.toml @@ -20,6 +20,7 @@ once_cell = "1.21.3" ordered-float = "5.0.0" tempfile = "3.20.0" tokio = { version = "1.45.0", features = ["macros", "rt"] } +tracing = "0.1" trait-variant = "0.1.2" [dev-dependencies] @@ -28,3 +29,4 @@ tokio = { version = "1.45.0", features = [ "rt-multi-thread", "test-util" ] } +tracing-test = "0.2" diff --git a/optd/src/catalog/iceberg.rs b/optd/src/catalog/iceberg.rs index 3eb21b6d..db6de0e5 100644 --- a/optd/src/catalog/iceberg.rs +++ b/optd/src/catalog/iceberg.rs @@ -26,6 +26,12 @@ impl OptdIcebergCatalog { } /// Retrieves a [`Table`] from the catalog. + #[tracing::instrument( + level = "debug", + skip_all, + fields(table_name = %table_name), + target = "optd::catalog" + )] async fn get_table(&self, table_name: &str) -> Result { let namespace_ident = NamespaceIdent::new(DEFAULT_NAMESPACE.to_string()); let table_ident = TableIdent::new(namespace_ident, table_name.to_string()); diff --git a/optd/src/demo/mod.rs b/optd/src/demo/mod.rs index 5098085d..58214550 100644 --- a/optd/src/demo/mod.rs +++ b/optd/src/demo/mod.rs @@ -14,6 +14,7 @@ use tokio::{ sync::mpsc, time::{sleep, timeout}, }; +use tracing::instrument; pub async fn properties( args: Vec, @@ -31,7 +32,7 @@ pub async fn properties( retriever.get_properties(group_id).await } - +#[instrument(target = "optd::demo", level = "info", name = "run_demo")] async fn run_demo() { // Compile the HIR. let config = Config::new("src/demo/demo.opt".into()); @@ -100,6 +101,7 @@ async fn run_demo() { mod demo { use super::*; + #[tracing_test::traced_test] #[tokio::test] async fn test_optimizer_demo() { run_demo().await diff --git a/optd/src/dsl/compile.rs b/optd/src/dsl/compile.rs index 2af465c3..18fa2e70 100644 --- a/optd/src/dsl/compile.rs +++ b/optd/src/dsl/compile.rs @@ -77,11 +77,19 @@ pub struct Verbosity { } /// Compiles a file into the [`HIR`]. +#[tracing::instrument(level = "info", skip(config, udfs), fields( + source_path = %config.path_str(), + udf_count = udfs.len() +), target = "optd::dsl::compile")] pub fn compile_hir(config: Config, udfs: HashMap) -> Result> { let source_path = config.path_str(); // If we cannot find the file we can't compile anything, so exit immediately. let source = fs::read_to_string(&config.path).unwrap_or_else(|e| { + tracing::error!(target: "optd::dsl::compile", + error = %e, + "Failed to read source file" + ); if e.kind() == std::io::ErrorKind::NotFound { eprintln!( "{} {}", @@ -99,13 +107,20 @@ pub fn compile_hir(config: Config, udfs: HashMap) -> Result) -> Result) -> Result) -> Result) -> Result Result> { let mut errors = Vec::new(); + // Step 1: Lexing + tracing::debug!(target: "optd::dsl::compile", "Starting lexical analysis"); let (tokens_opt, lex_errors) = lex(source, &config.path_str()); + let lex_error_count = lex_errors.len(); errors.extend(lex_errors); + + if lex_error_count > 0 { + tracing::warn!(target: "optd::dsl::compile", + error_count = lex_error_count, + "Lexical analysis completed with errors" + ); + } else { + tracing::debug!(target: "optd::dsl::compile", "Lexical analysis completed successfully"); + } + match tokens_opt { Some(tokens) => { + tracing::debug!(target: "optd::dsl::compile", + token_count = tokens.len(), + "Starting syntax analysis" + ); // Step 2: Parsing let (ast_opt, parse_errors) = parse_module(tokens, source, &config.path_str()); + let parse_error_count = parse_errors.len(); errors.extend(parse_errors); + + if parse_error_count > 0 { + tracing::warn!(target: "optd::dsl::compile", + error_count = parse_error_count, + "Syntax analysis completed with errors" + ); + } else { + tracing::debug!(target: "optd::dsl::compile", "Syntax analysis completed successfully"); + } + match ast_opt { - Some(ast) if errors.is_empty() => Ok(ast), - _ => Err(errors), + Some(ast) if errors.is_empty() => { + tracing::debug!(target: "optd::dsl::compile", "Parse phase completed successfully"); + Ok(ast) + } + _ => { + tracing::error!(target: "optd::dsl::compile", + total_errors = errors.len(), + "Parse phase failed with errors" + ); + Err(errors) + } } } - None => Err(errors), + None => { + tracing::error!(target: "optd::dsl::compile", + error_count = errors.len(), + "Lexical analysis failed, no tokens produced" + ); + Err(errors) + } } } @@ -188,12 +274,21 @@ pub fn parse(source: &str, config: &Config) -> Result> /// /// This function performs semantic analysis on the AST and converts it /// to a typed High-level Intermediate Representation (HIR). +#[tracing::instrument(level = "info", skip(source, ast, udfs), fields( + udf_count = udfs.len() +), target = "optd::dsl::compile")] pub fn ast_to_hir( source: &str, ast: Module, udfs: HashMap, ) -> Result<(HIR, TypeRegistry), CompileError> { + tracing::debug!(target: "optd::dsl::compile", "Converting AST to typed HIR"); + from_ast(&ast, udfs).map_err(|err_kind| { + tracing::error!(target: "optd::dsl::compile", + error = ?err_kind, + "AST to HIR conversion failed" + ); CompileError::AnalyzerError(AnalyzerError::new(source.to_string(), *err_kind)) }) } @@ -221,25 +316,47 @@ pub fn registry_check( /// 2. Building type constraints based on the annotated TypedSpan nodes. /// 3. Resolving these constraints to infer concrete types. /// 4. Transforming the typed HIR into its final form. +#[tracing::instrument( + level = "info", + skip(source, hir, registry), + target = "optd::dsl::compile" +)] pub fn infer( source: &str, hir: HIR, registry: &mut TypeRegistry, ) -> Result { + tracing::debug!(target: "optd::dsl::compile", "Starting type inference"); + // Step 1 & 2: Perform scope checking and generate type constraints // This traverses the HIR, verifies scopes, and creates constraints for all expressions + tracing::trace!(target: "optd::dsl::compile", "Generating type constraints"); registry.generate_constraints(&hir).map_err(|err_kind| { + tracing::error!(target: "optd::dsl::compile", + error = ?err_kind, + "Type constraint generation failed" + ); CompileError::AnalyzerError(AnalyzerError::new(source.to_string(), *err_kind)) })?; // Step 3: Resolve constraints. + tracing::trace!(target: "optd::dsl::compile", "Resolving type constraints"); registry.resolve().map_err(|err_kind| { + tracing::error!(target: "optd::dsl::compile", + error = ?err_kind, + "Type constraint resolution failed" + ); CompileError::AnalyzerError(AnalyzerError::new(source.to_string(), *err_kind)) })?; // Step 4: Transform HIR // After type inference, transform the HIR into its final form with complete type information. + tracing::trace!(target: "optd::dsl::compile", "Transforming HIR to final form"); into_hir(hir, registry).map_err(|err_kind| { + tracing::error!(target: "optd::dsl::compile", + error = ?err_kind, + "HIR transformation failed" + ); CompileError::AnalyzerError(AnalyzerError::new(source.to_string(), *err_kind)) }) } diff --git a/optd/src/dsl/engine/eval/expr.rs b/optd/src/dsl/engine/eval/expr.rs index 8cded69b..8535055b 100644 --- a/optd/src/dsl/engine/eval/expr.rs +++ b/optd/src/dsl/engine/eval/expr.rs @@ -8,6 +8,7 @@ use crate::dsl::analyzer::hir::{ use crate::dsl::engine::{Continuation, Engine, EngineResponse}; use ExprKind::*; use std::sync::Arc; +use tracing::instrument; impl Engine { /// Evaluates an if-then-else expression. @@ -21,6 +22,7 @@ impl Engine { /// * `then_expr` - The expression to evaluate if condition is true. /// * `else_expr` - The expression to evaluate if condition is false. /// * `k` - The continuation to receive evaluation results. + #[instrument(level = "trace", skip_all, target = "optd::dsl::engine::eval")] pub(crate) async fn evaluate_if_then_else( self, cond: Arc, @@ -36,9 +38,12 @@ impl Engine { Box::pin(capture!([then_expr, engine, else_expr, k], async move { match value.data { CoreData::Literal(Literal::Bool(b)) => { + tracing::trace!(target: "optd::dsl::engine::eval", condition_result = b, "Condition evaluated"); if b { + tracing::trace!(target: "optd::dsl::engine::eval", "Taking then branch"); engine.evaluate(then_expr, k).await } else { + tracing::trace!(target: "optd::dsl::engine::eval", "Taking else branch"); engine.evaluate(else_expr, k).await } } @@ -60,6 +65,12 @@ impl Engine { /// * `binding` - The binding to evaluate and bind to the context. /// * `after` - The expression to evaluate in the updated context. /// * `k` - The continuation to receive evaluation results. + #[instrument( + level = "trace", + skip_all, + fields(binding_name = %binding.name), + target = "optd::dsl::engine::eval" + )] pub(crate) async fn evaluate_let_binding( self, binding: LetBinding, @@ -202,6 +213,12 @@ impl Engine { /// * `called` - The called expression to evaluate. /// * `args` - The argument expressions to evaluate. /// * `k` - The continuation to receive evaluation results. + #[instrument( + level = "trace", + skip_all, + fields(num_args = args.len()), + target = "optd::dsl::engine::eval" + )] pub(crate) async fn evaluate_call( self, called: Arc, @@ -214,26 +231,35 @@ impl Engine { called, Arc::new(move |called_value| { Box::pin(capture!([args, engine, k], async move { + tracing::trace!(target: "optd::dsl::engine::eval", called_type = ?std::mem::discriminant(&called_value.data), "Call target evaluated"); match called_value.data { // Handle function calls. CoreData::Function(FunKind::Closure(params, body)) => { + tracing::trace!(target: "optd::dsl::engine::eval", num_params = params.len(), "Calling closure"); engine.evaluate_closure_call(params, body, args, k).await } CoreData::Function(FunKind::Udf(udf)) => { + tracing::trace!(target: "optd::dsl::engine::eval", "Calling Rust UDF"); engine.evaluate_rust_udf_call(udf, args, k).await } // Handle collection indexing. CoreData::Array(_) | CoreData::Tuple(_) | CoreData::Struct(_, _) => { + tracing::trace!(target: "optd::dsl::engine::eval", "Performing indexed access on collection"); engine.evaluate_indexed_access(called_value, args, k).await } - CoreData::Map(_) => engine.evaluate_map_lookup(called_value, args, k).await, + CoreData::Map(_) => { + tracing::trace!(target: "optd::dsl::engine::eval", "Performing map lookup"); + engine.evaluate_map_lookup(called_value, args, k).await + }, // Handle operator field accesses. CoreData::Logical(op) => { + tracing::trace!(target: "optd::dsl::engine::eval", "Accessing logical operator field"); engine.evaluate_logical_operator_access(op, args, k).await } CoreData::Physical(op) => { + tracing::trace!(target: "optd::dsl::engine::eval", "Accessing physical operator field"); engine.evaluate_physical_operator_access(op, args, k).await } diff --git a/optd/src/dsl/engine/eval/match.rs b/optd/src/dsl/engine/eval/match.rs index 9fc853be..e33a2f07 100644 --- a/optd/src/dsl/engine/eval/match.rs +++ b/optd/src/dsl/engine/eval/match.rs @@ -11,6 +11,7 @@ use Materializable::*; use PatternKind::*; use futures::future::BoxFuture; use std::sync::Arc; +use tracing::instrument; /// A type representing a match result, which is a value and an optional context. /// @@ -28,6 +29,12 @@ impl Engine { /// * `expr` - The expression to match against patterns. /// * `match_arms` - The list of pattern-expression pairs to try. /// * `k` - The continuation to receive evaluation results. + #[instrument( + level = "trace", + skip_all, + fields(num_arms = match_arms.len()), + target = "optd::dsl::engine::eval" + )] pub(crate) async fn evaluate_pattern_match( self, expr: Arc, @@ -45,6 +52,11 @@ impl Engine { expr, Arc::new(move |value| { Box::pin(capture!([match_arms, engine, k], async move { + tracing::trace!( + target: "optd::dsl::engine::eval", + value_type = ?std::mem::discriminant(&value.data), + "Expression evaluated, trying match arms" + ); // Try to match against each arm in order. engine.try_match_arms(value, match_arms, k).await })) @@ -118,6 +130,10 @@ impl Engine { /// * `pattern` - The pattern to match. /// * `ctx` - The current context to extend with bindings. /// * `k` - The continuation to receive the match result. +#[instrument(level = "trace", skip_all, fields( + pattern_type = %format!("{:?}", std::mem::discriminant(&pattern.kind)).split('(').next().unwrap_or("Unknown"), + value_type = %format!("{:?}", std::mem::discriminant(&value.data)).split('(').next().unwrap_or("Unknown") +), target = "optd::dsl::engine::eval")] fn match_pattern( value: Value, pattern: Pattern, @@ -132,24 +148,48 @@ where // Simple patterns. (Wildcard, _) => k((value, Some(ctx))).await, (Literal(pattern_lit), CoreData::Literal(value_lit)) => { - let context_opt = (pattern_lit == *value_lit).then_some(ctx); + let matches = pattern_lit == *value_lit; + let context_opt = matches.then_some(ctx); k((value, context_opt)).await } (EmptyArray, CoreData::Array(arr)) if arr.is_empty() => k((value, Some(ctx))).await, // Complex patterns. (Bind(ident, inner_pattern), _) => { + tracing::debug!(target: "optd::dsl::engine", + binding_name = %ident, + inner_pattern_type = %format!("{:?}", std::mem::discriminant(&inner_pattern.kind)).split('(').next().unwrap_or("Unknown"), + "Processing bind pattern" + ); match_bind_pattern(value.clone(), ident, *inner_pattern, ctx, k).await } (ArrayDecomp(head_pat, tail_pat), CoreData::Array(arr)) => { + tracing::debug!(target: "optd::dsl::engine", + array_length = arr.len(), + "Processing array decomposition pattern" + ); if arr.is_empty() { + tracing::trace!(target: "optd::dsl::engine", "Array decomposition failed: empty array"); return k((value, None)).await; } match_array_pattern(*head_pat, *tail_pat, arr, ctx, k).await } (Struct(pat_name, pat_fields), CoreData::Struct(val_name, val_fields)) => { - if pat_name != *val_name || pat_fields.len() != val_fields.len() { + let name_matches = pat_name == *val_name; + let field_count_matches = pat_fields.len() == val_fields.len(); + tracing::debug!(target: "optd::dsl::engine", + pattern_struct_name = %pat_name, + value_struct_name = %val_name, + pattern_field_count = pat_fields.len(), + value_field_count = val_fields.len(), + name_matches = name_matches, + field_count_matches = field_count_matches, + "Processing struct pattern" + ); + + if !name_matches || !field_count_matches { + tracing::trace!(target: "optd::dsl::engine", "Struct pattern match failed: name or field count mismatch"); return k((value, None)).await; } @@ -158,10 +198,26 @@ where // Materialized logical operators (Operator(op_pattern), CoreData::Logical(Materialized(log_op))) => { - if op_pattern.tag != log_op.operator.tag - || op_pattern.data.len() != log_op.operator.data.len() - || op_pattern.children.len() != log_op.operator.children.len() - { + let tag_matches = op_pattern.tag == log_op.operator.tag; + let data_count_matches = op_pattern.data.len() == log_op.operator.data.len(); + let children_count_matches = + op_pattern.children.len() == log_op.operator.children.len(); + + tracing::debug!(target: "optd::dsl::engine", + pattern_tag = %op_pattern.tag, + operator_tag = %log_op.operator.tag, + pattern_data_count = op_pattern.data.len(), + operator_data_count = log_op.operator.data.len(), + pattern_children_count = op_pattern.children.len(), + operator_children_count = log_op.operator.children.len(), + tag_matches = tag_matches, + data_count_matches = data_count_matches, + children_count_matches = children_count_matches, + "Processing materialized logical operator pattern" + ); + + if !tag_matches || !data_count_matches || !children_count_matches { + tracing::trace!(target: "optd::dsl::engine", "Materialized logical operator pattern match failed"); return k((value, None)).await; } @@ -170,10 +226,26 @@ where // Materialized physical operators (Operator(op_pattern), CoreData::Physical(Materialized(phys_op))) => { - if op_pattern.tag != phys_op.operator.tag - || op_pattern.data.len() != phys_op.operator.data.len() - || op_pattern.children.len() != phys_op.operator.children.len() - { + let tag_matches = op_pattern.tag == phys_op.operator.tag; + let data_count_matches = op_pattern.data.len() == phys_op.operator.data.len(); + let children_count_matches = + op_pattern.children.len() == phys_op.operator.children.len(); + + tracing::debug!(target: "optd::dsl::engine", + pattern_tag = %op_pattern.tag, + operator_tag = %phys_op.operator.tag, + pattern_data_count = op_pattern.data.len(), + operator_data_count = phys_op.operator.data.len(), + pattern_children_count = op_pattern.children.len(), + operator_children_count = phys_op.operator.children.len(), + tag_matches = tag_matches, + data_count_matches = data_count_matches, + children_count_matches = children_count_matches, + "Processing materialized physical operator pattern" + ); + + if !tag_matches || !data_count_matches || !children_count_matches { + tracing::trace!(target: "optd::dsl::engine", "Materialized physical operator pattern match failed"); return k((value, None)).await; } @@ -183,11 +255,17 @@ where // Unmaterialized logical operators (Operator(op_pattern), CoreData::Logical(UnMaterialized(group_id))) => { + tracing::debug!(target: "optd::dsl::engine", + group_id = ?group_id, + pattern_tag = %op_pattern.tag, + "Processing unmaterialized logical operator pattern - yielding group" + ); // Yield the group id back to the caller and provide a callback to match expanded value against the pattern. EngineResponse::YieldGroup( *group_id, Arc::new(move |expanded_value| { Box::pin(capture!([op_pattern, ctx, k], async move { + tracing::trace!(target: "optd::dsl::engine", "Continuing pattern match with expanded group value"); match_pattern( expanded_value, Pattern::new(Operator(op_pattern)), @@ -202,11 +280,17 @@ where // Unmaterialized physical operators (Operator(op_pattern), CoreData::Physical(UnMaterialized(goal))) => { + tracing::debug!(target: "optd::dsl::engine", + goal = ?goal, + pattern_tag = %op_pattern.tag, + "Processing unmaterialized physical operator pattern - yielding goal" + ); // Yield the goal back to the caller and provide a callback to match expanded value against the pattern. EngineResponse::YieldGoal( goal.clone(), Arc::new(move |expanded_value| { Box::pin(capture!([op_pattern, ctx, k], async move { + tracing::trace!(target: "optd::dsl::engine", "Continuing pattern match with expanded goal value"); match_pattern( expanded_value, Pattern::new(Operator(op_pattern)), @@ -220,12 +304,19 @@ where } // No match for other combinations. - _ => k((value, None)).await, + _ => { + tracing::trace!(target: "optd::dsl::engine", "No pattern match found for value/pattern combination"); + k((value, None)).await + } } }) } /// Matches a binding pattern. +#[tracing::instrument(level = "trace", skip(value, inner_pattern, ctx, k), fields( + binding_name = %ident, + inner_pattern_type = %format!("{:?}", std::mem::discriminant(&inner_pattern.kind)).split('(').next().unwrap_or("Unknown") +), target = "optd::dsl::engine")] async fn match_bind_pattern( value: Value, ident: String, @@ -236,6 +327,8 @@ async fn match_bind_pattern( where O: Clone + Send + 'static, { + tracing::trace!(target: "optd::dsl::engine", "Starting bind pattern match"); + // First check if the inner pattern matches without binding. match_pattern( value, @@ -245,10 +338,18 @@ where Box::pin(capture!([ident, k], async move { // Only bind if the inner pattern matched. if let Some(mut ctx) = ctx_opt { + tracing::debug!(target: "optd::dsl::engine", + binding_name = %ident, + "Inner pattern matched, creating binding" + ); // Create a new context with the binding. ctx.bind(ident.clone(), matched_value.clone()); k((matched_value, Some(ctx))).await } else { + tracing::trace!(target: "optd::dsl::engine", + binding_name = %ident, + "Inner pattern didn't match, bind pattern failed" + ); // Inner pattern didn't match, propagate failure. k((matched_value, None)).await } @@ -259,6 +360,11 @@ where } /// Matches an array decomposition pattern. +#[tracing::instrument(level = "trace", skip(head_pattern, tail_pattern, arr, ctx, k), fields( + array_length = arr.len(), + head_pattern_type = %format!("{:?}", std::mem::discriminant(&head_pattern.kind)).split('(').next().unwrap_or("Unknown"), + tail_pattern_type = %format!("{:?}", std::mem::discriminant(&tail_pattern.kind)).split('(').next().unwrap_or("Unknown") +), target = "optd::dsl::engine")] async fn match_array_pattern( head_pattern: Pattern, tail_pattern: Pattern, @@ -269,11 +375,20 @@ async fn match_array_pattern( where O: Clone + Send + 'static, { + tracing::trace!(target: "optd::dsl::engine", "Starting array decomposition pattern match"); + // Split array into head and tail. let head = arr[0].clone(); let tail_elements = arr[1..].to_vec(); + let tail_length = tail_elements.len(); let tail = Value::new(CoreData::Array(tail_elements)); + tracing::trace!(target: "optd::dsl::engine", + head_element_type = %format!("{:?}", std::mem::discriminant(&head.data)).split('(').next().unwrap_or("Unknown"), + tail_length = tail_length, + "Split array into head and tail for pattern matching" + ); + // Create components to match sequentially. let patterns = vec![head_pattern, tail_pattern]; let values = vec![head, tail]; @@ -288,6 +403,13 @@ where // Check if all parts matched successfully. let all_matched = results.iter().all(|(_, ctx_opt)| ctx_opt.is_some()); + tracing::trace!(target: "optd::dsl::engine", + head_matched = results[0].1.is_some(), + tail_matched = results[1].1.is_some(), + all_matched = all_matched, + "Array decomposition pattern match results" + ); + // All matched or not, get the matched values. let head_value = results[0].0.clone(); let tail_value = results[1].0.clone(); @@ -313,9 +435,11 @@ where acc_ctx }); + tracing::debug!(target: "optd::dsl::engine", "Array decomposition pattern matched successfully"); // Return the new array with the combined context. k((new_array, Some(combined_ctx))).await } else { + tracing::trace!(target: "optd::dsl::engine", "Array decomposition pattern match failed"); // Return the new array but with None context since match failed. k((new_array, None)).await } @@ -326,6 +450,10 @@ where } /// Matches a struct pattern. +#[tracing::instrument(level = "trace", skip(field_patterns, field_values, ctx, k), fields( + struct_name = %pat_name, + field_count = field_patterns.len() +), target = "optd::dsl::engine")] async fn match_struct_pattern( pat_name: String, field_patterns: Vec, @@ -336,6 +464,8 @@ async fn match_struct_pattern( where O: Clone + Send + 'static, { + tracing::trace!(target: "optd::dsl::engine", "Starting struct pattern match"); + // Match fields sequentially. match_components( field_patterns, @@ -346,9 +476,17 @@ where // Check if all fields matched successfully. let all_matched = results.iter().all(|(_, ctx_opt)| ctx_opt.is_some()); + tracing::trace!(target: "optd::dsl::engine", + struct_name = %pat_name, + fields_matched = results.iter().filter(|(_, ctx_opt)| ctx_opt.is_some()).count(), + total_fields = results.len(), + all_matched = all_matched, + "Struct pattern field match results" + ); + // Reconstruct struct with matched field values. let matched_values = results.iter().map(|(v, _)| v.clone()).collect(); - let new_struct = Value::new(CoreData::Struct(pat_name, matched_values)); + let new_struct = Value::new(CoreData::Struct(pat_name.clone(), matched_values)); if all_matched { // Combine contexts by folding over the results, starting with the base context. @@ -360,9 +498,17 @@ where acc_ctx }); + tracing::debug!(target: "optd::dsl::engine", + struct_name = %pat_name, + "Struct pattern matched successfully" + ); // Return the new struct with the combined context. k((new_struct, Some(combined_ctx))).await } else { + tracing::trace!(target: "optd::dsl::engine", + struct_name = %pat_name, + "Struct pattern match failed" + ); // Return the new struct but with None context since match failed. k((new_struct, None)).await } diff --git a/optd/src/dsl/engine/mod.rs b/optd/src/dsl/engine/mod.rs index 92a3d3f6..7a092edd 100644 --- a/optd/src/dsl/engine/mod.rs +++ b/optd/src/dsl/engine/mod.rs @@ -75,6 +75,7 @@ impl Engine { /// * `self` - The evaluation engine (owned). /// * `expr` - The expression to evaluate. /// * `k` - The continuation to receive each evaluation result. + #[tracing::instrument(level = "trace", skip(self, expr, k), fields(expr_kind = %format!("{:?}", expr.kind).split('(').next().unwrap_or("Unknown")), target="optd::dsl::engine")] pub fn evaluate( self, expr: Arc, @@ -83,6 +84,7 @@ impl Engine { use ExprKind::*; Box::pin(async move { + tracing::trace!(target: "optd::dsl::engine", "Evaluating expression"); match &expr.as_ref().kind { PatternMatch(sub_expr, match_arms) => { self.evaluate_pattern_match(sub_expr.clone(), match_arms.clone(), k) @@ -128,12 +130,14 @@ impl Engine { /// /// # Returns /// The result of the rule application. + #[tracing::instrument(level = "info", skip(self, values, return_k), fields(function_name = %name, num_args = values.len()), target="optd::dsl::engine")] pub async fn launch( self, name: &str, values: Vec, return_k: Continuation, ) -> EngineResponse { + tracing::debug!(target: "optd::dsl::engine", "Launching DSL function"); let rule_call = self.create_rule_call(name, values); self.evaluate( diff --git a/optd/src/memo/memory/implementation.rs b/optd/src/memo/memory/implementation.rs index 3f79f2ac..03a7bbed 100644 --- a/optd/src/memo/memory/implementation.rs +++ b/optd/src/memo/memory/implementation.rs @@ -8,6 +8,7 @@ use hashbrown::{HashMap, HashSet}; use std::collections::VecDeque; impl Memo for MemoryMemo { + #[tracing::instrument(level = "info", skip(self), target = "optd::memo")] async fn debug_dump(&self) -> Result<(), Infallible> { println!("\n===== MEMO TABLE DUMP ====="); println!("---- GROUPS ----"); @@ -86,32 +87,44 @@ impl Memo for MemoryMemo { Ok(()) } + #[tracing::instrument(level = "debug", skip(self), fields(group_id = ?group_id), target = "optd::memo")] async fn get_logical_properties( &self, group_id: GroupId, ) -> Result { let group_id = self.find_repr_group_id(group_id).await?; - Ok(self + tracing::trace!(target: "optd::memo", repr_group_id = ?group_id, "Found representative group for properties lookup"); + + let properties = self .group_info .get(&group_id) .unwrap_or_else(|| panic!("{:?} not found in memo table", group_id)) .logical_properties - .clone()) + .clone(); + + tracing::debug!(target: "optd::memo", properties = ?properties, "Retrieved logical properties"); + Ok(properties) } + #[tracing::instrument(level = "debug", skip(self), fields(group_id = ?group_id), target = "optd::memo")] async fn get_all_logical_exprs( &self, group_id: GroupId, ) -> Result, Infallible> { let group_id = self.find_repr_group_id(group_id).await?; - Ok(self + tracing::trace!(target: "optd::memo", repr_group_id = ?group_id, "Found representative group for expressions lookup"); + + let expressions = self .group_info .get(&group_id) .unwrap_or_else(|| panic!("{:?} not found in memo table", group_id)) .expressions - .clone()) - } + .clone(); + tracing::debug!(target: "optd::memo", num_expressions = expressions.len(), "Retrieved logical expressions from group"); + Ok(expressions) + } + #[tracing::instrument(level = "trace", skip(self), fields(logical_expr_id = ?logical_expr_id), target = "optd::memo")] async fn find_logical_expr_group( &self, logical_expr_id: LogicalExpressionId, @@ -123,18 +136,23 @@ impl Memo for MemoryMemo { .copied()) } + #[tracing::instrument(level = "debug", skip(self, props), fields(logical_expr_id = ?logical_expr_id, properties = ?props), target = "optd::memo")] async fn create_group( &mut self, logical_expr_id: LogicalExpressionId, props: &LogicalProperties, ) -> Result { let logical_expr_id = self.find_repr_logical_expr_id(logical_expr_id).await?; + tracing::trace!(target: "optd::memo", repr_logical_expr_id = ?logical_expr_id, "Found representative for logical expression"); if let Some(group_id) = self.logical_id_to_group_index.get(&logical_expr_id) { + tracing::debug!(target: "optd::memo", existing_group_id = ?group_id, "Expression already belongs to existing group"); return Ok(*group_id); } let group_id = self.next_group_id(); + tracing::debug!(target: "optd::memo", group_id = ?group_id, "Creating new group"); + let group_info = GroupInfo { expressions: HashSet::from([logical_expr_id]), goals: HashMap::new(), @@ -144,6 +162,8 @@ impl Memo for MemoryMemo { self.group_info.insert(group_id, group_info); self.logical_id_to_group_index .insert(logical_expr_id, group_id); + + tracing::trace!(target: "optd::memo", "Group created and indexed"); Ok(group_id) } @@ -204,6 +224,7 @@ impl Memo for MemoryMemo { /// /// # Returns /// Detailed results of all merges performed, including cascading merges. + #[tracing::instrument(level = "info", skip(self), fields(group_id_1 = ?group_id_1, group_id_2 = ?group_id_2), target = "optd::memo")] async fn merge_groups( &mut self, group_id_1: GroupId, @@ -216,14 +237,19 @@ impl Memo for MemoryMemo { // Find current representatives - skip if already merged. let group_id_1 = self.find_repr_group_id(g1).await?; let group_id_2 = self.find_repr_group_id(g2).await?; + tracing::trace!(target: "optd::memo", repr_g1 = ?group_id_1, repr_g2 = ?group_id_2, "Found representative groups"); + if group_id_1 == group_id_2 { + tracing::trace!(target: "optd::memo", "Groups already merged, skipping"); continue; } // Perform the group merge, creating a new representative. + tracing::debug!(target: "optd::memo", "Performing group pair merge"); let (new_group_id, merge_product) = self.merge_group_pair(group_id_1, group_id_2).await?; merge_operations.push(merge_product); + tracing::debug!(target: "optd::memo", new_group_id = ?new_group_id, "Group pair merged"); // Process expressions that reference the merged groups, // which may trigger additional group merges. @@ -231,11 +257,13 @@ impl Memo for MemoryMemo { .process_referencing_logical_exprs(group_id_1, group_id_2, new_group_id) .await?; + tracing::trace!(target: "optd::memo", num_new_pending = new_pending_merges.len(), "Found additional merges to process"); pending_merges.extend(new_pending_merges); } // Consolidate the merge products by replacing the incremental merges // with consolidated results that show the full picture. + tracing::debug!(target: "optd::memo", num_merge_operations = merge_operations.len(), "Consolidating merge products"); let group_merges = self .consolidate_merge_group_products(merge_operations) .await?; @@ -243,12 +271,20 @@ impl Memo for MemoryMemo { // Now handle goal merges: we do not need to pass any extra parameters as // the goals to merge are gathered in the `goals` member of each new // representative group. + tracing::debug!(target: "optd::memo", num_group_merges = group_merges.len(), "Processing dependent goal merges"); let goal_merges = self.merge_dependent_goals(&group_merges).await?; // Finally, we need to recursively merge the physical expressions that are // dependent on the merged goals (and the recursively merged expressions themselves). + tracing::debug!(target: "optd::memo", num_goal_merges = goal_merges.len(), "Processing dependent physical expression merges"); let expr_merges = self.merge_dependent_physical_exprs(&goal_merges).await?; + tracing::info!(target: "optd::memo", + num_group_merges = group_merges.len(), + num_goal_merges = goal_merges.len(), + num_expr_merges = expr_merges.len(), + "Group merge operation completed"); + Ok(MergeProducts { group_merges, goal_merges, @@ -256,13 +292,15 @@ impl Memo for MemoryMemo { }) } + #[tracing::instrument(level = "debug", skip(self), fields(goal_id = ?goal_id), target = "optd::memo")] async fn get_all_goal_members( &self, goal_id: GoalId, ) -> Result, Infallible> { let repr_goal_id = self.find_repr_goal_id(goal_id).await?; + tracing::trace!(target: "optd::memo", repr_goal_id = ?repr_goal_id, "Found representative goal"); - let members = self + let members: HashSet = self .goal_info .get(&repr_goal_id) .expect("Goal not found in memo table") @@ -271,9 +309,11 @@ impl Memo for MemoryMemo { .map(|&member_id| self.find_repr_goal_member_id(member_id)) .collect(); + tracing::debug!(target: "optd::memo", num_members = members.len(), "Retrieved goal members"); Ok(members) } + #[tracing::instrument(level = "debug", skip(self), fields(goal_id = ?goal_id, member_id = ?member_id), target = "optd::memo")] async fn add_goal_member( &mut self, goal_id: GoalId, @@ -284,15 +324,21 @@ impl Memo for MemoryMemo { let mut current_members = self.get_all_goal_members(goal_id).await?; let repr_member_id = self.find_repr_goal_member_id(member_id); + tracing::trace!(target: "optd::memo", repr_member_id = ?repr_member_id, "Found representative member ID"); + let added = current_members.insert(repr_member_id); if added { let repr_goal_id = self.find_repr_goal_id(goal_id).await?; + tracing::debug!(target: "optd::memo", repr_goal_id = ?repr_goal_id, "Adding new member to goal"); self.goal_info .get_mut(&repr_goal_id) .expect("Goal not found in memo table") .members .insert(repr_member_id); + tracing::info!(target: "optd::memo", "Goal member added successfully"); + } else { + tracing::debug!(target: "optd::memo", "Member already exists in goal"); } Ok(added) diff --git a/optd/src/optimizer/handlers.rs b/optd/src/optimizer/handlers.rs index 4d696c64..cf5de494 100644 --- a/optd/src/optimizer/handlers.rs +++ b/optd/src/optimizer/handlers.rs @@ -8,6 +8,7 @@ use crate::{ }, }; use tokio::sync::mpsc::Sender; +use tracing::{Instrument, Level}; impl Optimizer { /// This method initiates the optimization process for a logical plan by launching @@ -20,6 +21,7 @@ impl Optimizer { /// /// # Returns /// * `Result<(), Error>` - Success or error during processing. + #[tracing::instrument(level = "info", skip(self, plan, physical_tx), fields(task_id = ?optimize_plan_task_id, plan_root_op = %plan.0.tag), target="optd::optimizer::handlers")] pub(super) async fn process_optimize_request( &mut self, plan: LogicalPlan, @@ -28,11 +30,11 @@ impl Optimizer { ) -> Result<(), M::MemoError> { use JobKind::*; use LogicalIngest::*; - use OptimizerMessage::*; match self.probe_ingest_logical_plan(&plan.clone().into()).await? { Found(group_id) => { // The goal represents what we want to achieve: optimize the root group + tracing::debug!(target: "optd::optimizer::handlers", group_id = ?group_id, "Plan found in memo, launching optimize plan task"); // with no specific physical properties required. let goal = Goal(group_id, PhysicalProperties(None)); let goal_id = self.memo.get_goal_id(&goal).await?; @@ -42,6 +44,7 @@ impl Optimizer { .await?; } Missing(logical_exprs) => { + tracing::debug!(target: "optd::optimizer::handlers", num_missing_exprs = logical_exprs.len(), "Plan not fully in memo, scheduling derive jobs"); // Store the request as a pending message that will be processed // once all create task dependencies are resolved. let pending_dependencies = logical_exprs @@ -50,10 +53,14 @@ impl Optimizer { .map(|logical_expr_id| { self.schedule_job(optimize_plan_task_id, Derive(logical_expr_id)) }) - .collect(); + .collect::>(); + tracing::event!(target: "optd::optimizer::handlers", Level::DEBUG, num_dependencies = pending_dependencies.len(), "Request for optimize_plan_task pending"); self.pending_messages.push(PendingMessage::new( - Request(OptimizeRequest { plan, physical_tx }, optimize_plan_task_id), + OptimizerMessage::Request( + OptimizeRequest { plan, physical_tx }, + optimize_plan_task_id, + ), pending_dependencies, )); } @@ -72,6 +79,7 @@ impl Optimizer { /// /// # Returns /// * `Result<(), Error>` - Success or error during processing. + #[tracing::instrument(level = "info", skip(self, plan), fields(target_group_id = ?group_id, job_id = ?job_id, plan_type = %std::any::type_name_of_val(&plan)), target="optd::optimizer::handlers")] pub(super) async fn process_new_logical_partial( &mut self, plan: PartialLogicalPlan, @@ -81,34 +89,40 @@ impl Optimizer { use EngineProduct::*; use JobKind::*; use LogicalIngest::*; - use OptimizerMessage::*; let group_id = self.memo.find_repr_group_id(group_id).await?; + tracing::debug!(target: "optd::optimizer::handlers", "Processing new logical partial for group {}", group_id.0); match self.probe_ingest_logical_plan(&plan).await? { Found(new_group_id) if new_group_id != group_id => { // Atomically perform the merge in the memo and process all results. + tracing::debug!(target: "optd::optimizer::handlers", "Merging group {} into {}", new_group_id.0, group_id.0); let merge_results = self.memo.merge_groups(group_id, new_group_id).await?; - self.handle_merge_result(merge_results).await?; } Found(_) => { + tracing::debug!(target: "optd::optimizer::handlers", "New logical partial already exists in group or an equivalent group, no action needed."); // Group already exists, nothing to merge or do. } Missing(logical_exprs) => { + tracing::debug!(target: "optd::optimizer::handlers", num_missing_exprs = logical_exprs.len(), "New logical partial requires deriving properties for new expressions."); // Store the request as a pending message that will be processed // once all create task dependencies are resolved. let related_task_id = self.get_related_task_id(job_id); let pending_dependencies = logical_exprs .iter() .cloned() - .map(|logical_expr_id| { - self.schedule_job(related_task_id, Derive(logical_expr_id)) - }) - .collect(); + .map( + |logical_expr_id| { + tracing::trace!(target: "optd::optimizer::handlers", "Scheduling Derive job for expr_id={:?} due to new logical partial, task_id={:?}", logical_expr_id, related_task_id); + self.schedule_job(related_task_id, Derive(logical_expr_id)) + } + ) + .collect::>(); + tracing::event!(target: "optd::optimizer::handlers", Level::DEBUG, num_dependencies = pending_dependencies.len(), "NewLogicalPartial processing pending"); self.pending_messages.push(PendingMessage::new( - Product(NewLogicalPartial(plan, group_id), job_id), + OptimizerMessage::Product(NewLogicalPartial(plan, group_id), job_id), pending_dependencies, )); } @@ -127,6 +141,7 @@ impl Optimizer { /// /// # Returns /// * `Result<(), Error>` - Success or error during processing. + #[tracing::instrument(level = "info", skip(self, plan), fields(target_goal_id = ?goal_id, job_id = ?job_id, plan_type = %std::any::type_name_of_val(&plan)), target="optd::optimizer::handlers")] pub(super) async fn process_new_physical_partial( &mut self, plan: PartialPhysicalPlan, @@ -138,14 +153,17 @@ impl Optimizer { let goal_id = self.memo.find_repr_goal_id(goal_id).await?; let related_task_id = self.get_related_task_id(job_id); + tracing::debug!(target: "optd::optimizer::handlers", "Processing new physical partial for goal {}", goal_id.0); let member_id = self.probe_ingest_physical_plan(&plan).await?; let added = self.memo.add_goal_member(goal_id, member_id).await?; + tracing::debug!(target: "optd::optimizer::handlers", ?member_id, added_to_goal = added, "Processed physical plan into goal member"); match member_id { PhysicalExpressionId(_) => { // TODO: Here we would launch costing tasks based on the design. } GoalId(goal_id) => { + tracing::debug!(target: "optd::optimizer::handlers", sub_goal_id = ?goal_id, "New physical partial resulted in a sub-goal"); if added { // Optimize the new sub-goal and add to task graph. let sub_optimize_task_id = self.ensure_optimize_goal_task(goal_id).await?; @@ -175,12 +193,14 @@ impl Optimizer { /// /// # Returns /// * `Result<(), Error>` - Success or error during processing. + #[tracing::instrument(level = "info", skip(self, properties), fields(expr_id = ?expression_id, job_id = ?job_id), target="optd::optimizer::handlers")] pub(super) async fn process_create_group( &mut self, expression_id: LogicalExpressionId, properties: &LogicalProperties, job_id: JobId, ) -> Result<(), M::MemoError> { + tracing::debug!(target: "optd::optimizer::handlers", "Creating group for expression"); self.memo.create_group(expression_id, properties).await?; self.resolve_dependencies(job_id).await; Ok(()) @@ -196,6 +216,7 @@ impl Optimizer { /// /// # Returns /// * `Result<(), Error>` - Success or error during processing. + #[tracing::instrument(level = "info", skip(self, continuation), fields(target_group_id = ?group_id, job_id = ?job_id), target="optd::optimizer::handlers")] pub(super) async fn process_group_subscription( &mut self, group_id: GroupId, @@ -203,6 +224,7 @@ impl Optimizer { job_id: JobId, ) -> Result<(), M::MemoError> { let parent_task_id = self.get_related_task_id(job_id); + tracing::debug!(target: "optd::optimizer::handlers", "Processing group subscription for parent task {:?}", parent_task_id); self.launch_fork_logical_task(group_id, continuation, parent_task_id) .await } @@ -216,22 +238,30 @@ impl Optimizer { /// /// # Returns /// * `Result<(), Error>` - Success or error during processing. + #[tracing::instrument(level = "info", skip(self, sender), fields(target_group_id = ?group_id), target="optd::optimizer::handlers")] pub(super) async fn process_retrieve_properties( &mut self, group_id: GroupId, sender: Sender, ) -> Result<(), M::MemoError> { let props = self.memo.get_logical_properties(group_id).await?; + tracing::debug!(target: "optd::optimizer::handlers", "Retrieved properties for group, sending to requester"); // We don't want to make a job out of this, as it is merely a way to unblock // an existing pending job. We send it to the channel without blocking the // main co-routine. + let span = tracing::debug_span!( + target: "optd::optimizer::handlers", + "send_properties_job", + group_id = ?group_id + ); tokio::spawn(async move { sender .send(props) - .await - .expect("Failed to send properties - channel closed."); - }); + .await.unwrap_or_else(|e| { + tracing::warn!(target: "optd::optimizer::handlers", "Failed to send properties - channel closed: {}", e); + }); + }.instrument(span)); Ok(()) } @@ -244,6 +274,7 @@ impl Optimizer { /// /// # Parameters /// * `completed_job_id` - ID of the completed job. + #[tracing::instrument(level = "debug", skip(self), fields(completed_job_id = ?completed_job_id), target="optd::optimizer::handlers")] async fn resolve_dependencies(&mut self, completed_job_id: JobId) { // Update dependencies and collect ready messages. let ready_indices: Vec<_> = self @@ -257,18 +288,23 @@ impl Optimizer { .collect(); // Process all ready messages (in reverse order to avoid index issues when removing). - for i in ready_indices.iter().rev() { - let pending = self.pending_messages.swap_remove(*i); + if !ready_indices.is_empty() { + tracing::debug!(target: "optd::optimizer::handlers", num_ready_messages = ready_indices.len(), "Processing messages with resolved dependencies"); + for i in ready_indices.iter().rev() { + let pending = self.pending_messages.swap_remove(*i); + let msg_type_name = std::any::type_name_of_val(&pending.message); + tracing::trace!(target: "optd::optimizer::handlers", "Re-scheduling message of type: {}", msg_type_name); - // Re-send the message to be processed in a new co-routine to not block the - // main co-routine. - let message_tx = self.message_tx.clone(); - tokio::spawn(async move { - message_tx - .send(pending.message) - .await - .expect("Failed to re-send ready message - channel closed."); - }); + // Re-send the message to be processed in a new co-routine to not block the + // main co-routine. + let span = tracing::debug_span!(target: "optd::optimizer::handlers", "re-schedule_pending_message"); + let message_tx = self.message_tx.clone(); + tokio::spawn(async move { + if let Err(e) = message_tx.send(pending.message).await { + tracing::error!(target: "optd::optimizer::handlers", "Failed to re-send ready message - channel closed: {}", e); + } + }.instrument(span)); + } } } } diff --git a/optd/src/optimizer/jobs/execute.rs b/optd/src/optimizer/jobs/execute.rs index 130c3d82..7b9c5c78 100644 --- a/optd/src/optimizer/jobs/execute.rs +++ b/optd/src/optimizer/jobs/execute.rs @@ -19,6 +19,7 @@ use crate::{ }; use std::sync::Arc; use tokio::sync::mpsc::Sender; +use tracing::Instrument; impl Optimizer { /// Executes a job to derive logical properties for a logical expression. @@ -54,22 +55,30 @@ impl Optimizer { // with how costing is handled (i.e. with $ and *). ); + tracing::debug!(target: "optd::optimizer::jobs", "Launching DSL engine for 'derive'"); let message_tx = self.message_tx.clone(); - tokio::spawn(async move { - let response = engine - .launch( - "derive", - vec![plan], - Arc::new(move |logical_props| { - Box::pin(async move { - CreateGroup(expression_id, value_to_logical_properties(&logical_props)) - }) - }), - ) - .await; + let span = tracing::debug_span!(target: "optd::optimizer::jobs", "derive_job", job_id = ?job_id, expression_id = ?expression_id); + tokio::spawn( + async move { + let response = engine + .launch( + "derive", + vec![plan], + Arc::new(move |logical_props| { + Box::pin(async move { + CreateGroup( + expression_id, + value_to_logical_properties(&logical_props), + ) + }) + }), + ) + .await; - Self::process_engine_response(job_id, message_tx, response).await; - }); + Self::process_engine_response(job_id, message_tx, response).await; + } + .instrument(span), + ); Ok(()) } @@ -105,25 +114,33 @@ impl Optimizer { Some(group_id), ); + tracing::debug!(target: "optd::optimizer::jobs", "Launching DSL engine for transformation rule '{}'", rule_name.0); let message_tx = self.message_tx.clone(); - tokio::spawn(async move { - let response = engine - .launch( - &rule_name.0, - vec![plan], - Arc::new(move |output| { - Box::pin(async move { - NewLogicalPartial(value_to_partial_logical(&output), group_id) - }) - }), - ) - .await; + let rule_name_clone = rule_name.0.clone(); + let span = tracing::debug_span!(target: "optd::optimizer::jobs", "transform_job", job_id = ?job_id, rule_name = %rule_name_clone); + tokio::spawn( + async move { + let response = engine + .launch( + &rule_name.0, + vec![plan], + Arc::new(move |output| { + Box::pin(async move { + NewLogicalPartial(value_to_partial_logical(&output), group_id) + }) + }), + ) + .await; - // A none result means the rule was not applicable. - if !matches!(response, Return(Value { data: None, .. }, _)) { - Self::process_engine_response(job_id, message_tx, response).await; + // A none result means the rule was not applicable. + if !matches!(response, Return(Value { data: None, .. }, _)) { + Self::process_engine_response(job_id, message_tx, response).await; + } else { + tracing::debug!(target: "optd::optimizer::rules", rule_name=%rule_name.0, "Rule not applicable or returned None"); + } } - }); + .instrument(span), + ); Ok(()) } @@ -160,22 +177,28 @@ impl Optimizer { Some(group_id), ); + tracing::debug!(target: "optd::optimizer::jobs", "Launching DSL engine for implementation rule '{}'", rule_name.0); let message_tx = self.message_tx.clone(); - tokio::spawn(async move { - let response = engine - .launch( - &rule_name.0, - vec![plan, properties], - Arc::new(move |plan| { - Box::pin(async move { - NewPhysicalPartial(value_to_partial_physical(&plan), goal_id) - }) - }), - ) - .await; + let rule_name_clone = rule_name.0.clone(); + let span = tracing::debug_span!(target: "optd::optimizer::jobs", "implement_job", job_id = ?job_id, rule_name = %rule_name_clone); + tokio::spawn( + async move { + let response = engine + .launch( + &rule_name.0, + vec![plan, properties], + Arc::new(move |plan| { + Box::pin(async move { + NewPhysicalPartial(value_to_partial_physical(&plan), goal_id) + }) + }), + ) + .await; - Self::process_engine_response(job_id, message_tx, response).await; - }); + Self::process_engine_response(job_id, message_tx, response).await; + } + .instrument(span), + ); Ok(()) } @@ -208,15 +231,22 @@ impl Optimizer { Some(group_id), ); + tracing::debug!(target: "optd::optimizer::jobs", "Executing logical continuation"); let message_tx = self.message_tx.clone(); - tokio::spawn(async move { - let response = k.0(plan).await; + let span = tracing::debug_span!(target: "optd::optimizer::jobs", "continue_logical_job", job_id = ?job_id); + tokio::spawn( + async move { + let response = k.0(plan).await; - // A none result means the rule was not applicable. - if !matches!(response, Return(Value { data: None, .. }, _)) { - Self::process_engine_response(job_id, message_tx, response).await; + // A none result means the rule was not applicable. + if !matches!(response, Return(Value { data: None, .. }, _)) { + Self::process_engine_response(job_id, message_tx, response).await; + } else { + tracing::debug!(target: "optd::optimizer::jobs", "Logical continuation returned None or was not applicable"); + } } - }); + .instrument(span), + ); Ok(()) } @@ -235,6 +265,7 @@ impl Optimizer { engine_tx: Sender, response: EngineResponse, ) { + tracing::trace!(target: "optd::optimizer::jobs", job_id = ?job_id, "Processing engine response"); use EngineProduct::*; use EngineResponse::*; @@ -247,10 +278,9 @@ impl Optimizer { YieldGoal(_, _) => todo!("Decide what to do here depending on the cost model"), }; - engine_tx - .send(msg) - .await - .expect("Failed to send message - channel closed"); + if let Err(e) = engine_tx.send(msg).await { + tracing::error!(target: "optd::optimizer::jobs", job_id = ?job_id, "Failed to send optimizer message from job: {}", e); + } } /// Helper function to create a new engine instance. diff --git a/optd/src/optimizer/jobs/manage.rs b/optd/src/optimizer/jobs/manage.rs index f517c583..3f28fe49 100644 --- a/optd/src/optimizer/jobs/manage.rs +++ b/optd/src/optimizer/jobs/manage.rs @@ -15,6 +15,7 @@ impl Optimizer { /// /// # Returns /// * The ID of the created job. + #[tracing::instrument(level = "debug", skip(self, kind), fields(task_id = ?task_id, job_kind = %std::any::type_name_of_val(&kind)), target = "optd::optimizer::tasks")] pub(crate) fn schedule_job(&mut self, task_id: TaskId, kind: JobKind) -> JobId { // Generate a new job ID. let job_id = self.next_job_id; @@ -24,6 +25,7 @@ impl Optimizer { let job = Job(task_id, kind); self.pending_jobs.insert(job_id, job); self.job_schedule_queue.push_back(job_id); + tracing::debug!(target: "optd::optimizer::tasks", job_id = ?job_id, "Job scheduled"); job_id } @@ -36,6 +38,7 @@ impl Optimizer { /// /// # Returns /// * `Result<(), Error>` - Success or error during job launching. + #[tracing::instrument(level = "trace", skip(self), target = "optd::optimizer::tasks")] pub(crate) async fn launch_pending_jobs(&mut self) -> Result<(), M::MemoError> { use JobKind::*; @@ -44,6 +47,7 @@ impl Optimizer { && !self.job_schedule_queue.is_empty() { let job_id = self.job_schedule_queue.pop_back().unwrap(); + tracing::trace!(target: "optd::optimizer::tasks", job_id = ?job_id, "Launching pending job"); // Move the job from pending to running. let job = self.pending_jobs.remove(&job_id).unwrap(); diff --git a/optd/src/optimizer/merge/helpers.rs b/optd/src/optimizer/merge/helpers.rs index 68c10af5..77747e30 100644 --- a/optd/src/optimizer/merge/helpers.rs +++ b/optd/src/optimizer/merge/helpers.rs @@ -26,6 +26,13 @@ impl Optimizer { /// * `task_id` - The ID of the group exploration task to update. /// * `all_logical_exprs` - The complete set of logical expressions known for this group. /// * `principal` - Whether this task is the principal one (responsible for launching transforms). + #[tracing::instrument( + level = "debug", + skip_all, + name = "update_tasks_for_merged_group", + fields(task_id = ?task_id, principal), + target = "optd::optimizer::merge" + )] pub(super) async fn update_tasks( &mut self, task_id: TaskId, @@ -37,6 +44,11 @@ impl Optimizer { if new_exprs.is_empty() { return Ok(()); } + tracing::debug!( + target: "optd::optimizer::merge", + num_new_exprs = new_exprs.len(), + "Updating task with new expressions" + ); let (group_id, fork_tasks, optimize_goal_tasks) = { let task = self.get_explore_group_task(task_id).unwrap(); @@ -58,6 +70,10 @@ impl Optimizer { let continuation_tasks = self.create_logical_cont_tasks(&new_exprs, group_id, fork_task_id, &continuation); + tracing::trace!( + target: "optd::optimizer::merge", + "Adding {} continuation tasks to fork task {}", continuation_tasks.len(), fork_task_id.0 + ); self.get_fork_logical_task_mut(fork_task_id) .unwrap() .continue_with_logical_in @@ -74,6 +90,10 @@ impl Optimizer { let implement_tasks = self.create_implement_tasks(&new_exprs, goal_id, optimize_goal_id); + tracing::trace!( + target: "optd::optimizer::merge", + "Adding {} implement tasks to optimize goal task {}", implement_tasks.len(), optimize_goal_id.0 + ); self.get_optimize_goal_task_mut(optimize_goal_id) .unwrap() .implement_expression_in @@ -85,6 +105,10 @@ impl Optimizer { if principal { let transform_tasks = self.create_transform_tasks(&new_exprs, group_id, task_id); + tracing::trace!( + target: "optd::optimizer::merge", + "Adding {} transform tasks to explore group task {}", transform_tasks.len(), task_id.0 + ); self.get_explore_group_task_mut(task_id) .unwrap() .transform_expr_in @@ -125,6 +149,13 @@ impl Optimizer { /// # Arguments /// * `principal_task_id` - The ID of the task to retain as canonical. /// * `secondary_task_ids` - All other task IDs to merge into the principal. + #[tracing::instrument( + level = "debug", + skip_all, + name = "consolidate_explore_group_tasks", + fields(principal_task_id = ?principal_task_id, num_secondaries = secondary_task_ids.len()), + target = "optd::optimizer::merge" + )] pub(super) async fn consolidate_group_explore( &mut self, principal_task_id: TaskId, @@ -137,6 +168,10 @@ impl Optimizer { let fork_tasks = std::mem::take(&mut task.fork_logical_out); let goal_tasks = std::mem::take(&mut task.optimize_goal_out); + tracing::trace!( + target: "optd::optimizer::merge", + "Deleting secondary explore group task {} and moving {} fork and {} goal tasks", task_id.0, fork_tasks.len(), goal_tasks.len() + ); self.delete_task(task_id); fork_tasks.into_iter().for_each(|fork_id| { @@ -169,6 +204,13 @@ impl Optimizer { /// # Arguments /// * `principal_task_id` - The ID of the task to retain as canonical. /// * `secondary_task_ids` - All other task IDs to merge into the principal. + #[tracing::instrument( + level = "debug", + skip_all, + name = "consolidate_optimize_goal_tasks", + fields(principal_task_id = ?principal_task_id, num_secondaries = secondary_task_ids.len()), + target = "optd::optimizer::merge" + )] pub(super) async fn consolidate_goal_optimize( &mut self, principal_task_id: TaskId, @@ -182,6 +224,10 @@ impl Optimizer { let optimize_in_tasks = std::mem::take(&mut task.optimize_goal_in); let optimize_plan_tasks = std::mem::take(&mut task.optimize_plan_out); + tracing::trace!( + target: "optd::optimizer::merge", + "Deleting secondary optimize goal task {} and moving dependencies", task_id.0 + ); self.delete_task(task_id); optimize_out_tasks.into_iter().for_each(|optimize_id| { @@ -224,6 +270,13 @@ impl Optimizer { /// /// # Arguments /// * `task_id` - The ID of the group exploration task to deduplicate. + #[tracing::instrument( + level = "debug", + skip_all, + name = "deduplicate_group_tasks", + fields(task_id = ?task_id), + target = "optd::optimizer::merge" + )] pub(super) async fn dedup_tasks(&mut self, task_id: TaskId) -> Result<(), M::MemoError> { let task = self.get_explore_group_task_mut(task_id).unwrap(); let old_exprs = std::mem::take(&mut task.dispatched_exprs); @@ -243,6 +296,10 @@ impl Optimizer { } } + tracing::trace!( + target: "optd::optimizer::merge", + "Deduplicating tasks: {} unique expressions, {} duplicates to delete", seen.len(), dups.len() + ); (seen, dups) }; diff --git a/optd/src/optimizer/merge/mod.rs b/optd/src/optimizer/merge/mod.rs index 8d2bb618..c47e471e 100644 --- a/optd/src/optimizer/merge/mod.rs +++ b/optd/src/optimizer/merge/mod.rs @@ -14,11 +14,16 @@ impl Optimizer { /// /// # Returns /// * `Result<(), OptimizeError>` - Success or an error that occurred during processing. + #[tracing::instrument(level = "info", skip(self, result), target = "optd::optimizer::merge")] pub(super) async fn handle_merge_result( &mut self, result: MergeProducts, ) -> Result<(), M::MemoError> { - self.handle_group_merges(&result.group_merges).await?; + tracing::debug!( + target: "optd::optimizer::merge", + "Handling {} group merges and {} goal merges", result.group_merges.len(), result.goal_merges.len() + ); + self.handle_group_merges(result.group_merges).await?; self.handle_goal_merges(&result.goal_merges).await } @@ -50,15 +55,17 @@ impl Optimizer { /// /// # Returns /// * `Result<(), OptimizeError>` - Success or an error that occurred during processing. + #[tracing::instrument(level = "debug", skip_all, name = "handle_group_merges")] async fn handle_group_merges( &mut self, - group_merges: &[MergeGroupProduct], + group_merges: Vec, ) -> Result<(), M::MemoError> { for MergeGroupProduct { new_group_id, merged_groups, } in group_merges { + tracing::debug!(target: "optd::optimizer::merge", ?new_group_id, num_merged_groups = merged_groups.len(), "Processing group merge product"); // For each merged group, get all group exploration tasks. // We don't need to check for the new group ID since it is guaranteed to be new. let group_explore_tasks: Vec<_> = merged_groups @@ -67,7 +74,8 @@ impl Optimizer { .collect(); if !group_explore_tasks.is_empty() { - let all_logical_exprs = self.memo.get_all_logical_exprs(*new_group_id).await?; + tracing::debug!(target: "optd::optimizer::merge", ?new_group_id, num_related_explore_tasks = group_explore_tasks.len(), "Found related group exploration tasks"); + let all_logical_exprs = self.memo.get_all_logical_exprs(new_group_id).await?; let (principal_task_id, secondary_task_ids) = group_explore_tasks.split_first().unwrap(); @@ -78,17 +86,19 @@ impl Optimizer { self.dedup_tasks(*task_id).await?; // Step 2: Send *new* logical expressions to each task. let is_principal = task_id == principal_task_id; + tracing::debug!(target: "optd::optimizer::merge", task_id = ?task_id, is_principal, "Updating task with new expressions from merged group"); self.update_tasks(*task_id, &all_logical_exprs, is_principal) .await?; } // Step 3: Consolidate all dependent tasks into the new "representative" task. + tracing::debug!(target: "optd::optimizer::merge", ?principal_task_id, num_secondary_tasks = secondary_task_ids.len(), "Consolidating group exploration tasks"); self.consolidate_group_explore(*principal_task_id, secondary_task_ids) .await; // Step 4: Set the index to point to the new representative task. self.group_exploration_task_index - .insert(*new_group_id, *principal_task_id); + .insert(new_group_id, *principal_task_id); } } @@ -116,6 +126,7 @@ impl Optimizer { /// /// # Returns /// * `Result<(), OptimizeError>` - Success or an error that occurred during processing. + #[tracing::instrument(level = "debug", skip_all, name = "handle_goal_merges")] async fn handle_goal_merges( &mut self, goal_merges: &[MergeGoalProduct], @@ -125,6 +136,7 @@ impl Optimizer { merged_goals, } in goal_merges { + tracing::debug!(target: "optd::optimizer::merge", ?new_goal_id, num_merged_goals = merged_goals.len(), "Processing goal merge product"); // For each merged goal, get all goal optimization tasks. // We don't need to check for the new goal ID since it is guaranteed to be new. let goal_optimize_tasks: Vec<_> = merged_goals @@ -133,6 +145,7 @@ impl Optimizer { .collect(); if !goal_optimize_tasks.is_empty() { + tracing::debug!(target: "optd::optimizer::merge", ?new_goal_id, num_related_optimize_tasks = goal_optimize_tasks.len(), "Found related goal optimization tasks"); let (principal_task_id, secondary_task_ids) = goal_optimize_tasks.split_first().unwrap(); @@ -140,6 +153,7 @@ impl Optimizer { // handled in the `handle_group_merges` method, so we don't need to do it here. // Step 1: Consolidate all dependent tasks into the new "representative" task. + tracing::debug!(target: "optd::optimizer::merge", ?principal_task_id, num_secondary_tasks = secondary_task_ids.len(), "Consolidating goal optimization tasks"); self.consolidate_goal_optimize(*principal_task_id, secondary_task_ids) .await; diff --git a/optd/src/optimizer/mod.rs b/optd/src/optimizer/mod.rs index 45d42ac7..3ff417f8 100644 --- a/optd/src/optimizer/mod.rs +++ b/optd/src/optimizer/mod.rs @@ -8,6 +8,7 @@ use hashbrown::{HashMap, HashSet}; use hir_cir::extract_rulebook_from_hir; use std::{collections::VecDeque, sync::Arc}; use tokio::sync::mpsc::{self, Receiver, Sender}; +use tracing::Instrument; mod handlers; pub mod hir_cir; @@ -203,56 +204,97 @@ impl Optimizer { client_rx, ); - tokio::spawn(async move { - // TODO: If an error occurs we could restart or reboot the memo. - // Rather than failing (e.g. memo could be distributed). - optimizer.run().await.expect("Optimizer failure"); - }); + tokio::spawn( + async move { + // TODO: If an error occurs we could restart or reboot the memo. + // Rather than failing (e.g. memo could be distributed). + optimizer.run().await.expect("Optimizer failure"); + } + .instrument(tracing::info_span!(target: "optd::optimizer", "optimizer_thread")), + ); client_tx } /// Run the optimizer's main processing loop. + #[tracing::instrument( + level = "debug", + name = "optimizer_run_loop", + skip(self), + target = "optd::optimizer" + )] async fn run(mut self) -> Result<(), M::MemoError> { use ClientRequest::*; use EngineProduct::*; - use OptimizerMessage::*; + use OptimizerMessage as OmMsg; // Alias to avoid conflict with tracing::debug! loop { tokio::select! { Some(client_request) = self.client_rx.recv() => { + let req_span = tracing::info_span!(target: "optd::optimizer", "process_client_request"); + let message_tx = self.message_tx.clone(); match client_request { Optimize(optimize_request) => { + // Add plan_root_op for better context + let plan_root_op = optimize_request.plan.0.tag.clone(); + req_span.record("plan_root_op", tracing::field::display(&plan_root_op)); + // Create a task for the optimization request. let task_id = self.create_optimize_plan_task( optimize_request.plan.clone(), optimize_request.physical_tx.clone() ); + req_span.record("request_type", tracing::field::display("Optimize")); + req_span.record("client_task_id", tracing::field::debug(task_id)); // Forward the client request to the message processing loop - // in a new coroutine to avoid a deadlock. - tokio::spawn(async move { - message_tx.send(Request(optimize_request, task_id)) - .await - .expect("Failed to forward client request - channel closed"); - }); + // in a new + tokio::spawn( + async move { + if let Err(e) = message_tx.send(OmMsg::Request(optimize_request, task_id)).await { + tracing::error!(target: "optd::optimizer", "Failed to forward client request: {}", e); + } + }.instrument(req_span) // Instrumenting the spawn to ensure its logs are associated with the parent request span. + + ); }, DumpMemo => { - self.memo.debug_dump().await?; + req_span.record("request_type", tracing::field::display("DumpMemo")); + async { + tracing::info!(target: "optd::optimizer", "Processing dump memo request"); + self.memo.debug_dump().await + } + .instrument(req_span) + .await?; } } }, Some(message) = self.message_rx.recv() => { + let msg_span = tracing::info_span!(target: "optd::optimizer", "process_optimizer_message", message_type = %std::any::type_name_of_val(&message)); + let _guard = msg_span.enter(); + tracing::debug!(target: "optd::optimizer", "Received optimizer message"); + // Process the next message in the channel. match message { - Request(OptimizeRequest { plan, physical_tx }, task_id) => - self.process_optimize_request(plan, physical_tx, task_id).await?, - Retrieve(group_id, response_tx) => { + OmMsg::Request(OptimizeRequest { plan, physical_tx }, task_id) => { + // Add plan_root_op for better context + let plan_root_op = plan.0.tag.clone(); + msg_span.record("task_id", tracing::field::debug(task_id)); + msg_span.record("plan_root_op", tracing::field::display(&plan_root_op)); + tracing::debug!(target: "optd::optimizer", "Processing optimize request from internal queue"); + self.process_optimize_request(plan, physical_tx, task_id).await?; + }, + OmMsg::Retrieve(group_id, response_tx) => { + msg_span.record("group_id", tracing::field::debug(group_id)); + tracing::debug!(target: "optd::optimizer", "Processing retrieve properties request"); self.process_retrieve_properties(group_id, response_tx).await?; }, - Product(product, job_id) => { + OmMsg::Product(product, job_id) => { + msg_span.record("job_id", tracing::field::debug(job_id)); + msg_span.record("product_type", tracing::field::display(std::any::type_name_of_val(&product))); + tracing::debug!(target: "optd::optimizer", "Processing job product"); let task_id = self.get_related_task_id(job_id); // Only process the product if the task is still active. @@ -271,19 +313,25 @@ impl Optimizer { self.process_group_subscription(group_id, continuation, job_id).await?; } } + } else { + tracing::warn!(target: "optd::optimizer", task_id = ?task_id, "Task processing skipped as it's no longer active"); } // A job is guaranteed to be terminated, unless it has been added to the pending queue. - if !self.pending_messages.iter().any(|msg| matches!(msg.message, Product(_, j) if j == job_id)) { + if !self.pending_messages.iter().any(|msg| matches!(msg.message, OmMsg::Product(_, j) if j == job_id)) { self.running_jobs.remove(&job_id); + tracing::debug!(target: "optd::optimizer::jobs", job_id = ?job_id, "Job completed and removed from running jobs"); } } }; // Launch pending jobs according to a policy (currently LIFO). - self.launch_pending_jobs().await?; + self.launch_pending_jobs().instrument(tracing::debug_span!(target:"optd::optimizer", "launch_pending_jobs_dispatch")).await?; }, - else => break Ok(()), + else => { + tracing::info!(target: "optd::optimizer", "Optimizer run loop finished - all channels closed"); + break Ok(()); + } } } } diff --git a/optd/src/optimizer/tasks/delete.rs b/optd/src/optimizer/tasks/delete.rs index 4bb93485..dff66c11 100644 --- a/optd/src/optimizer/tasks/delete.rs +++ b/optd/src/optimizer/tasks/delete.rs @@ -13,40 +13,50 @@ impl Optimizer { /// /// # Parameters /// * `task_id` - ID of the task to delete. + #[tracing::instrument(level = "debug", skip(self), fields(task_id = ?task_id), target = "optd::optimizer::tasks")] pub(crate) fn delete_task(&mut self, task_id: TaskId) { use Task::*; let task = self.get_task(task_id).cloned().unwrap(); + tracing::debug!(target: "optd::optimizer::tasks", task_type = %task.task_type(), "Starting task deletion"); + match &task { TransformExpression(task) => { + tracing::trace!(target: "optd::optimizer::tasks", explore_group_out = ?task.explore_group_out, "Removing transform task from explore group"); let explore_task = self .get_explore_group_task_mut(task.explore_group_out) .unwrap(); explore_task.transform_expr_in.remove(&task_id); if let Some(fork_id) = task.fork_in { + tracing::trace!(target: "optd::optimizer::tasks", fork_id = ?fork_id, "Recursively deleting fork task"); self.delete_task(fork_id); } } ImplementExpression(implement_expression_task) => { + tracing::trace!(target: "optd::optimizer::tasks", optimize_goal_out = ?implement_expression_task.optimize_goal_out, "Removing implement task from goal"); let optimize_goal_task = self .get_optimize_goal_task_mut(implement_expression_task.optimize_goal_out) .unwrap(); optimize_goal_task.implement_expression_in.remove(&task_id); if let Some(fork_id) = implement_expression_task.fork_in { + tracing::trace!(target: "optd::optimizer::tasks", fork_id = ?fork_id, "Recursively deleting fork task"); self.delete_task(fork_id); } } ContinueWithLogical(task) => { + tracing::trace!(target: "optd::optimizer::tasks", fork_out = ?task.fork_out, "Removing continue task from fork"); let fork_task = self.get_fork_logical_task_mut(task.fork_out).unwrap(); fork_task.continue_with_logical_in.remove(&task_id); if let Some(fork_id) = task.fork_in { + tracing::trace!(target: "optd::optimizer::tasks", fork_id = ?fork_id, "Recursively deleting fork task"); self.delete_task(fork_id); } } ForkLogical(task) => { + tracing::trace!(target: "optd::optimizer::tasks", explore_group_in = ?task.explore_group_in, "Removing fork task from explore group"); let explore_task = self .get_explore_group_task_mut(task.explore_group_in) .unwrap(); @@ -56,11 +66,13 @@ impl Optimizer { if explore_task.fork_logical_out.is_empty() && explore_task.optimize_goal_out.is_empty() { + tracing::debug!(target: "optd::optimizer::tasks", explore_task_id = ?task.explore_group_in, "Explore task has no more purpose, deleting"); self.delete_task(task.explore_group_in); } let continue_tasks: Vec<_> = task.continue_with_logical_in.iter().copied().collect(); + tracing::trace!(target: "optd::optimizer::tasks", num_continue_tasks = continue_tasks.len(), "Recursively deleting continue tasks"); for continue_id in continue_tasks { self.delete_task(continue_id); } @@ -69,10 +81,12 @@ impl Optimizer { assert!(task.fork_logical_out.is_empty()); assert!(task.optimize_goal_out.is_empty()); + tracing::trace!(target: "optd::optimizer::tasks", "Removing explore group task from index"); self.group_exploration_task_index .retain(|_, &mut v| v != task_id); let transform_tasks: Vec<_> = task.transform_expr_in.iter().copied().collect(); + tracing::trace!(target: "optd::optimizer::tasks", num_transform_tasks = transform_tasks.len(), "Recursively deleting transform tasks"); for transform_id in transform_tasks { self.delete_task(transform_id); } @@ -81,19 +95,25 @@ impl Optimizer { assert!(task.optimize_goal_out.is_empty()); assert!(task.optimize_plan_out.is_empty()); + tracing::trace!(target: "optd::optimizer::tasks", "Removing optimize goal task from index"); self.goal_optimization_task_index .retain(|_, &mut v| v != task_id); let implement_tasks: Vec<_> = task.implement_expression_in.iter().copied().collect(); + tracing::trace!(target: "optd::optimizer::tasks", num_implement_tasks = implement_tasks.len(), "Recursively deleting implement tasks"); for implement_id in implement_tasks { self.delete_task(implement_id); } } - OptimizePlan(_) => todo!(), + OptimizePlan(_) => { + tracing::warn!(target: "optd::optimizer::tasks", "OptimizePlan task deletion not yet implemented"); + todo!() + } } // Finally, remove the task from the task collection. + tracing::info!(target: "optd::optimizer::tasks", "Task deletion completed"); self.remove_task(task_id); } } diff --git a/optd/src/optimizer/tasks/launch.rs b/optd/src/optimizer/tasks/launch.rs index fddcd019..b3293c13 100644 --- a/optd/src/optimizer/tasks/launch.rs +++ b/optd/src/optimizer/tasks/launch.rs @@ -24,6 +24,7 @@ impl Optimizer { /// # Parameters /// * `plan`: The logical plan to be optimized. /// * `physical_tx`: The channel to send the optimized physical plans back. + #[tracing::instrument(level = "debug", skip(self, plan, physical_tx), fields(plan_root_op = %plan.0.tag), target = "optd::optimizer::tasks")] pub(crate) fn create_optimize_plan_task( &mut self, plan: LogicalPlan, @@ -32,6 +33,8 @@ impl Optimizer { use Task::*; let task_id = self.next_task_id(); + tracing::info!(target: "optd::optimizer::tasks", task_id = ?task_id, task_type = "OptimizePlan", "Task created"); + let optimize_plan_task = OptimizePlanTask { plan, physical_tx, @@ -50,13 +53,17 @@ impl Optimizer { /// # Parameters /// * `task_id`: The ID of the task to be launched. /// * `goal_id`: The goal ID that this task is optimizing for. + #[tracing::instrument(level = "debug", skip(self), fields(task_id = ?task_id, goal_id = ?goal_id), target = "optd::optimizer::tasks")] pub(crate) async fn launch_optimize_plan_task( &mut self, task_id: TaskId, goal_id: GoalId, ) -> Result<(), M::MemoError> { + tracing::debug!(target: "optd::optimizer::tasks", "Launching optimize plan task"); + // Launch goal optimize task if needed, and get its ID. let goal_optimize_task_id = self.ensure_optimize_goal_task(goal_id).await?; + tracing::debug!(target: "optd::optimizer::tasks", goal_optimize_task_id = ?goal_optimize_task_id, "Goal optimize task ensured"); // Register task and connect in graph. let optimize_plan_task = self.get_optimize_plan_task_mut(task_id).unwrap(); @@ -68,6 +75,7 @@ impl Optimizer { .optimize_plan_out .insert(task_id); + tracing::info!(target: "optd::optimizer::tasks", "Optimize plan task launched and connected to goal task"); Ok(()) } @@ -77,6 +85,7 @@ impl Optimizer { /// * `group_id`: The ID of the group to be explored. /// * `continuation`: The logical continuation to be used. /// * `parent_task_id`: The ID of the parent task. + #[tracing::instrument(level = "debug", skip(self, continuation), fields(group_id = ?group_id, parent_task_id = ?parent_task_id), target = "optd::optimizer::tasks")] pub(crate) async fn launch_fork_logical_task( &mut self, group_id: GroupId, @@ -86,9 +95,11 @@ impl Optimizer { use Task::*; let fork_task_id = self.next_task_id(); + tracing::info!(target: "optd::optimizer::tasks", task_id = ?fork_task_id, task_type = "ForkLogical", "Task created"); // Launch the group exploration task if needed, and get its ID. let explore_group_in = self.ensure_group_exploration_task(group_id).await?; + tracing::debug!(target: "optd::optimizer::tasks", explore_group_in = ?explore_group_in, "Group exploration task ensured"); // Create continuation tasks for all expressions. let expressions = self @@ -98,6 +109,7 @@ impl Optimizer { .clone(); let continue_with_logical_in = self.create_logical_cont_tasks(&expressions, group_id, fork_task_id, &continuation); + tracing::debug!(target: "optd::optimizer::tasks", num_expressions = expressions.len(), num_continuation_tasks = continue_with_logical_in.len(), "Continuation tasks created"); // Create the fork task. let fork_logical_task = ForkLogicalTask { @@ -114,6 +126,7 @@ impl Optimizer { .insert(fork_task_id); self.add_task(fork_task_id, ForkLogical(fork_logical_task)); + tracing::info!(target: "optd::optimizer::tasks", "Fork logical task launched and connected"); Ok(()) } @@ -128,6 +141,7 @@ impl Optimizer { /// /// # Returns /// * `TaskId`: The ID of the created continue task. + #[tracing::instrument(level = "debug", skip(self, continuation), fields(expr_id = ?expression_id, group_id = ?group_id, fork_out = ?fork_out), target = "optd::optimizer::tasks")] pub(crate) fn launch_continue_with_logical_task( &mut self, expression_id: LogicalExpressionId, @@ -138,6 +152,8 @@ impl Optimizer { use Task::*; let task_id = self.next_task_id(); + tracing::info!(target: "optd::optimizer::tasks", task_id = ?task_id, task_type = "ContinueWithLogical", "Task created"); + let task = ContinueWithLogicalTask { expression_id, fork_out, @@ -149,6 +165,7 @@ impl Optimizer { task_id, JobKind::ContinueWithLogical(expression_id, group_id, continuation), ); + tracing::debug!(target: "optd::optimizer::tasks", "Continue with logical task launched and job scheduled"); task_id } @@ -163,6 +180,7 @@ impl Optimizer { /// /// # Returns /// * `TaskId`: The ID of the created transform task. + #[tracing::instrument(level = "debug", skip(self), fields(expr_id = ?expr_id, rule = %rule.0, explore_group_out = ?explore_group_out, group_id = ?group_id), target = "optd::optimizer::tasks")] pub(crate) fn launch_transform_expression_task( &mut self, expr_id: LogicalExpressionId, @@ -173,6 +191,8 @@ impl Optimizer { use Task::*; let task_id = self.next_task_id(); + tracing::info!(target: "optd::optimizer::tasks", task_id = ?task_id, task_type = "TransformExpression", "Task created"); + let task = TransformExpressionTask { _rule: rule.clone(), expression_id: expr_id, @@ -185,6 +205,7 @@ impl Optimizer { task_id, JobKind::TransformExpression(rule, expr_id, group_id), ); + tracing::debug!(target: "optd::optimizer::tasks", "Transform expression task launched and job scheduled"); task_id } @@ -199,6 +220,7 @@ impl Optimizer { /// /// # Returns /// * `TaskId`: The ID of the created implement task. + #[tracing::instrument(level = "debug", skip(self), fields(expr_id = ?expr_id, rule = %rule.0, optimize_goal_out = ?optimize_goal_out, goal_id = ?goal_id), target = "optd::optimizer::tasks")] pub(crate) fn launch_implement_expression_task( &mut self, expr_id: LogicalExpressionId, @@ -209,6 +231,8 @@ impl Optimizer { use Task::*; let task_id = self.next_task_id(); + tracing::info!(target: "optd::optimizer::tasks", task_id = ?task_id, task_type = "ImplementExpression", "Task created"); + let task = ImplementExpressionTask { _rule: rule.clone(), expression_id: expr_id, @@ -221,6 +245,7 @@ impl Optimizer { task_id, JobKind::ImplementExpression(rule, expr_id, goal_id), ); + tracing::debug!(target: "optd::optimizer::tasks", "Implement expression task launched and job scheduled"); task_id } @@ -234,6 +259,7 @@ impl Optimizer { /// /// # Returns /// * `HashSet` - The IDs of all created transform tasks + #[tracing::instrument(level = "debug", skip(self, expressions), fields(num_expressions = expressions.len(), group_id = ?group_id, explore_task_id = ?explore_task_id), target = "optd::optimizer::tasks")] pub(crate) fn create_transform_tasks( &mut self, expressions: &HashSet, @@ -243,6 +269,8 @@ impl Optimizer { let transformations = self.rule_book.get_transformations().to_vec(); let mut transform_tasks = HashSet::new(); + tracing::debug!(target: "optd::optimizer::tasks", num_rules = transformations.len(), "Creating transform tasks for expressions"); + for &expr_id in expressions { for rule in &transformations { let task_id = self.launch_transform_expression_task( @@ -255,6 +283,7 @@ impl Optimizer { } } + tracing::info!(target: "optd::optimizer::tasks", num_tasks_created = transform_tasks.len(), "Transform tasks created"); transform_tasks } @@ -267,6 +296,7 @@ impl Optimizer { /// /// # Returns /// * `HashSet` - The IDs of all created implement tasks + #[tracing::instrument(level = "debug", skip(self, expressions), fields(num_expressions = expressions.len(), goal_id = ?goal_id, optimize_task_id = ?optimize_task_id), target = "optd::optimizer::tasks")] pub(crate) fn create_implement_tasks( &mut self, expressions: &HashSet, @@ -276,6 +306,8 @@ impl Optimizer { let implementations = self.rule_book.get_implementations().to_vec(); let mut implement_tasks = HashSet::new(); + tracing::debug!(target: "optd::optimizer::tasks", num_rules = implementations.len(), "Creating implement tasks for expressions"); + for &expr_id in expressions { for rule in &implementations { let task_id = self.launch_implement_expression_task( @@ -288,6 +320,7 @@ impl Optimizer { } } + tracing::info!(target: "optd::optimizer::tasks", num_tasks_created = implement_tasks.len(), "Implement tasks created"); implement_tasks } @@ -335,6 +368,7 @@ impl Optimizer { /// /// # Returns /// * `TaskId`: The ID of the task that was created or reused. + #[tracing::instrument(level = "debug", skip(self), fields(group_id = ?group_id), target = "optd::optimizer::tasks")] pub(crate) async fn ensure_group_exploration_task( &mut self, group_id: GroupId, @@ -343,16 +377,21 @@ impl Optimizer { // Find the representative group for the given group ID. let group_repr = self.memo.find_repr_group_id(group_id).await?; + tracing::debug!(target: "optd::optimizer::tasks", group_repr = ?group_repr, "Found representative group"); // Check if we already have an exploration task for this group. if let Some(task_id) = self.group_exploration_task_index.get(&group_repr) { + tracing::debug!(target: "optd::optimizer::tasks", task_id = ?task_id, "Reusing existing group exploration task"); return Ok(*task_id); } let exploration_task_id = self.next_task_id(); + tracing::info!(target: "optd::optimizer::tasks", task_id = ?exploration_task_id, task_type = "ExploreGroup", "Creating new group exploration task"); // Create transform expression tasks for each logical expression and rule combination. let logical_expressions = self.memo.get_all_logical_exprs(group_repr).await?; + tracing::debug!(target: "optd::optimizer::tasks", num_expressions = logical_expressions.len(), "Retrieved logical expressions for group"); + let transform_expr_in = self.create_transform_tasks(&logical_expressions, group_id, exploration_task_id); @@ -370,6 +409,7 @@ impl Optimizer { self.group_exploration_task_index .insert(group_repr, exploration_task_id); + tracing::info!(target: "optd::optimizer::tasks", "Group exploration task created and indexed"); Ok(exploration_task_id) } @@ -382,6 +422,7 @@ impl Optimizer { /// # Returns /// * `TaskId`: The ID of the task that was created or reused. #[async_recursion] + #[tracing::instrument(level = "debug", skip(self), fields(goal_id = ?goal_id), target = "optd::optimizer::tasks")] pub(crate) async fn ensure_optimize_goal_task( &mut self, goal_id: GoalId, @@ -390,19 +431,27 @@ impl Optimizer { // Find the representative goal for the given goal ID. let goal_repr = self.memo.find_repr_goal_id(goal_id).await?; + tracing::debug!(target: "optd::optimizer::tasks", goal_repr = ?goal_repr, "Found representative goal"); // Check if we already have an optimization task for this goal. if let Some(task_id) = self.goal_optimization_task_index.get(&goal_repr) { + tracing::debug!(target: "optd::optimizer::tasks", task_id = ?task_id, "Reusing existing goal optimization task"); return Ok(*task_id); } let goal_optimize_task_id = self.next_task_id(); + tracing::info!(target: "optd::optimizer::tasks", task_id = ?goal_optimize_task_id, task_type = "OptimizeGoal", "Creating new goal optimization task"); let Goal(group_id, _) = self.memo.materialize_goal(goal_id).await?; + tracing::debug!(target: "optd::optimizer::tasks", group_id = ?group_id, "Materialized goal to get group"); + let explore_group_in = self.ensure_group_exploration_task(group_id).await?; + tracing::debug!(target: "optd::optimizer::tasks", explore_group_in = ?explore_group_in, "Group exploration task ensured"); // Launch all implementation tasks. let expressions = self.memo.get_all_logical_exprs(group_id).await?; + tracing::debug!(target: "optd::optimizer::tasks", num_expressions = expressions.len(), "Retrieved expressions for implementation"); + let implement_expression_in = self.create_implement_tasks(&expressions, goal_id, goal_optimize_task_id); @@ -425,6 +474,7 @@ impl Optimizer { self.add_task(goal_optimize_task_id, OptimizeGoal(goal_optimize_task)); self.goal_optimization_task_index .insert(goal_repr, goal_optimize_task_id); + tracing::debug!(target: "optd::optimizer::tasks", "Goal optimization task registered and indexed"); // Ensure sub-goals are getting explored too, we do this after registering // the task to avoid infinite recursion. @@ -436,11 +486,15 @@ impl Optimizer { .filter_map(|member_id| match member_id { GoalMemberId::GoalId(sub_goal_id) => Some(sub_goal_id), _ => None, - }); + }) + .collect::>(); + + tracing::debug!(target: "optd::optimizer::tasks", num_subgoals = sub_goals.len(), "Found sub-goals to optimize"); // Launch optimization tasks for each subgoal and establish links. let mut subgoal_task_ids = HashSet::new(); for sub_goal_id in sub_goals { + tracing::trace!(target: "optd::optimizer::tasks", sub_goal_id = ?sub_goal_id, "Ensuring sub-goal optimization task"); let sub_goal_task_id = self.ensure_optimize_goal_task(sub_goal_id).await?; subgoal_task_ids.insert(sub_goal_task_id); @@ -452,10 +506,12 @@ impl Optimizer { } // Update the parent task with links to subgoals. + let num_subgoals = subgoal_task_ids.len(); self.get_optimize_goal_task_mut(goal_optimize_task_id) .unwrap() .optimize_goal_in = subgoal_task_ids; + tracing::info!(target: "optd::optimizer::tasks", num_subgoals, "Goal optimization task created with sub-goal dependencies"); Ok(goal_optimize_task_id) } } diff --git a/optd/src/optimizer/tasks/manage.rs b/optd/src/optimizer/tasks/manage.rs index b80119b7..14661839 100644 --- a/optd/src/optimizer/tasks/manage.rs +++ b/optd/src/optimizer/tasks/manage.rs @@ -23,13 +23,22 @@ impl Optimizer { } /// Add a new task. + #[tracing::instrument(level = "debug", skip(self, task), fields(task_id = ?task_id, task_type = %task.task_type()), target = "optd::optimizer::tasks")] pub(crate) fn add_task(&mut self, task_id: TaskId, task: Task) { + tracing::trace!(target: "optd::optimizer::tasks", "Adding task to manager"); self.tasks.insert(task_id, task); } /// Remove a task. + #[tracing::instrument(level = "debug", skip(self), fields(task_id = ?task_id), target = "optd::optimizer::tasks")] pub(crate) fn remove_task(&mut self, task_id: TaskId) -> Option { - self.tasks.remove(&task_id) + let task = self.tasks.remove(&task_id); + if let Some(ref task) = task { + tracing::debug!(target: "optd::optimizer::tasks", task_type = %task.task_type(), "Task removed from manager"); + } else { + tracing::warn!(target: "optd::optimizer::tasks", "Attempted to remove non-existent task"); + } + task } // Direct task type access by ID - immutable versions diff --git a/optd/src/optimizer/tasks/mod.rs b/optd/src/optimizer/tasks/mod.rs index d6fcc498..8ad689ca 100644 --- a/optd/src/optimizer/tasks/mod.rs +++ b/optd/src/optimizer/tasks/mod.rs @@ -158,3 +158,18 @@ pub(crate) struct ContinueWithLogicalTask { /// Potential `ForkLogicalTask` fork spawned off from this task. pub fork_in: Option, } + +impl Task { + /// Returns a string representation of the task type for logging purposes. + pub(crate) fn task_type(&self) -> &'static str { + match self { + Task::OptimizePlan(_) => "OptimizePlan", + Task::OptimizeGoal(_) => "OptimizeGoal", + Task::ExploreGroup(_) => "ExploreGroup", + Task::ImplementExpression(_) => "ImplementExpression", + Task::TransformExpression(_) => "TransformExpression", + Task::ForkLogical(_) => "ForkLogical", + Task::ContinueWithLogical(_) => "ContinueWithLogical", + } + } +}