From 531b594cdafbc60531fa0dd4f80551c88311be29 Mon Sep 17 00:00:00 2001 From: skalwaghe-56 Date: Mon, 6 Oct 2025 17:56:43 +0530 Subject: [PATCH] feat: collector automatically merge and align multiple collect() called with different schema --- src/builder/analyzer.rs | 107 ++++++++++++++++++++++++++++++++----- src/builder/plan.rs | 6 +++ src/execution/evaluator.rs | 53 ++++++++++++------ 3 files changed, 137 insertions(+), 29 deletions(-) diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index fa2b131f..5447d04b 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -255,14 +255,71 @@ fn try_merge_collector_schemas( schema1: &CollectorSchema, schema2: &CollectorSchema, ) -> Result { - let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?; + // Union all fields from both schemas + let mut field_map: HashMap = HashMap::new(); + + // Add fields from schema1 + for field in &schema1.fields { + field_map.insert(field.name.clone(), field.value_type.clone()); + } + + // Merge fields from schema2 + for field in &schema2.fields { + if let Some(existing_type) = field_map.get(&field.name) { + // Try to merge types if they differ + let merged_type = try_make_common_value_type(existing_type, &field.value_type)?; + field_map.insert(field.name.clone(), merged_type); + } else { + field_map.insert(field.name.clone(), field.value_type.clone()); + } + } + + // Sort fields by name for consistent ordering, but prioritize UUID fields + let mut fields: Vec = field_map + .into_iter() + .map(|(name, value_type)| FieldSchema { + name, + value_type, + description: None, + }) + .collect(); + + // Prioritize UUID fields by placing them at the beginning for efficiency + fields.sort_by(|a, b| { + let a_is_uuid = matches!(a.value_type.typ, ValueType::Basic(BasicValueType::Uuid)); + let b_is_uuid = matches!(b.value_type.typ, ValueType::Basic(BasicValueType::Uuid)); + + match (a_is_uuid, b_is_uuid) { + (true, false) => std::cmp::Ordering::Less, // UUID fields first + (false, true) => std::cmp::Ordering::Greater, // UUID fields first + _ => a.name.cmp(&b.name), // Then alphabetical + } + }); + + // Handle auto_uuid_field_idx (UUID fields are now at position 0 for efficiency) + let auto_uuid_field_idx = match (schema1.auto_uuid_field_idx, schema2.auto_uuid_field_idx) { + (Some(idx1), Some(idx2)) => { + let name1 = &schema1.fields[idx1].name; + let name2 = &schema2.fields[idx2].name; + if name1 == name2 { + // UUID fields are prioritized to position 0, so check if first field is UUID + if fields.first().map_or(false, |f| { + matches!(f.value_type.typ, ValueType::Basic(BasicValueType::Uuid)) + }) { + Some(0) + } else { + fields.iter().position(|f| &f.name == name1) + } + } else { + None // Different auto_uuid fields, disable + } + } + _ => None, // If either doesn't have it, or both don't, disable + }; + Ok(CollectorSchema { fields, - auto_uuid_field_idx: if schema1.auto_uuid_field_idx == schema2.auto_uuid_field_idx { - schema1.auto_uuid_field_idx - } else { - None - }, + auto_uuid_field_idx, }) } @@ -803,16 +860,42 @@ impl AnalyzerContext { let (struct_mapping, fields_schema) = analyze_struct_mapping(&op.input, op_scope)?; let has_auto_uuid_field = op.auto_uuid_field.is_some(); let fingerprinter = Fingerprinter::default().with(&fields_schema)?; + let input_field_names: Vec = + fields_schema.iter().map(|f| f.name.clone()).collect(); + let collector_ref = add_collector( + &op.scope_name, + op.collector_name.clone(), + CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()), + op_scope, + )?; + // Get the merged collector schema after adding + let collector_schema: Arc = { + let scope = find_scope(&op.scope_name, op_scope)?.1; + let states = scope.states.lock().unwrap(); + let collector = states.collectors.get(&op.collector_name).unwrap(); + collector.schema.clone() + }; + + // Pre-compute field index mappings for efficient evaluation + let field_index_mapping: Vec = input_field_names + .iter() + .map(|field_name| { + collector_schema + .fields + .iter() + .position(|f| &f.name == field_name) + .unwrap_or(usize::MAX) + }) + .collect(); + let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp { name: reactive_op.name.clone(), has_auto_uuid_field, input: struct_mapping, - collector_ref: add_collector( - &op.scope_name, - op.collector_name.clone(), - CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()), - op_scope, - )?, + input_field_names, + collector_schema, + collector_ref, + field_index_mapping, fingerprinter, }); async move { Ok(collect_op) }.boxed() diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 88d08c60..ed48da67 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -1,4 +1,5 @@ use crate::base::schema::FieldSchema; +use crate::base::spec::FieldName; use crate::prelude::*; use crate::ops::interface::*; @@ -90,7 +91,12 @@ pub struct AnalyzedCollectOp { pub name: String, pub has_auto_uuid_field: bool, pub input: AnalyzedStructMapping, + pub input_field_names: Vec, + pub collector_schema: Arc, pub collector_ref: AnalyzedCollectorReference, + /// Pre-computed mapping from input field index to collector field index. + /// For missing fields, the value is usize::MAX. + pub field_index_mapping: Vec, /// Fingerprinter of the collector's schema. Used to decide when to reuse auto-generated UUIDs. pub fingerprinter: Fingerprinter, } diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index 471c9995..251a1bba 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -483,26 +483,45 @@ async fn evaluate_op_scope( } AnalyzedReactiveOp::Collect(op) => { - let mut field_values = Vec::with_capacity( - op.input.fields.len() + if op.has_auto_uuid_field { 1 } else { 0 }, - ); - let field_values_iter = assemble_input_values(&op.input.fields, scoped_entries); - if op.has_auto_uuid_field { - field_values.push(value::Value::Null); - field_values.extend(field_values_iter); - let uuid = memory.next_uuid( - op.fingerprinter - .clone() - .with(&field_values[1..])? - .into_fingerprint(), - )?; - field_values[0] = value::Value::Basic(value::BasicValue::Uuid(uuid)); - } else { - field_values.extend(field_values_iter); - }; let collector_entry = scoped_entries .headn(op.collector_ref.scope_up_level as usize) .ok_or_else(|| anyhow::anyhow!("Collector level out of bound"))?; + + // Assemble input values + let input_values: Vec = + assemble_input_values(&op.input.fields, scoped_entries).collect(); + + // Create field_values vector for all fields in the merged schema + let mut field_values: Vec = + vec![value::Value::Null; op.collector_schema.fields.len()]; + + // Use pre-computed field index mappings for O(1) field placement + for (i, &collector_field_idx) in op.field_index_mapping.iter().enumerate() { + if collector_field_idx != usize::MAX { + field_values[collector_field_idx] = input_values[i].clone(); + } + } + + // Handle auto_uuid_field (assumed to be at position 0 for efficiency) + if op.has_auto_uuid_field { + if let Some(uuid_idx) = op.collector_schema.auto_uuid_field_idx { + let uuid = memory.next_uuid( + op.fingerprinter + .clone() + .with( + &field_values + .iter() + .enumerate() + .filter(|(i, _)| *i != uuid_idx) + .map(|(_, v)| v) + .collect::>(), + )? + .into_fingerprint(), + )?; + field_values[uuid_idx] = value::Value::Basic(value::BasicValue::Uuid(uuid)); + } + } + { let mut collected_records = collector_entry.collected_values [op.collector_ref.local.collector_idx as usize]