diff --git a/Cargo.lock b/Cargo.lock index ae2dace2084f..387717d66bfe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -247,8 +247,7 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1bb018b6960c87fd9d025009820406f74e83281185a8bdcb44880d2aa5c9a87" +source = "git+https://github.com/apache/arrow-rs.git?rev=1029974bc0f03b8adb089bfa8cc8f0ee96701866#1029974bc0f03b8adb089bfa8cc8f0ee96701866" dependencies = [ "arrow-arith", "arrow-array", @@ -271,8 +270,7 @@ dependencies = [ [[package]] name = "arrow-arith" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44de76b51473aa888ecd6ad93ceb262fb8d40d1f1154a4df2f069b3590aa7575" +source = "git+https://github.com/apache/arrow-rs.git?rev=1029974bc0f03b8adb089bfa8cc8f0ee96701866#1029974bc0f03b8adb089bfa8cc8f0ee96701866" dependencies = [ "arrow-array", "arrow-buffer", @@ -285,8 +283,7 @@ dependencies = [ [[package]] name = "arrow-array" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29ed77e22744475a9a53d00026cf8e166fe73cf42d89c4c4ae63607ee1cfcc3f" +source = "git+https://github.com/apache/arrow-rs.git?rev=1029974bc0f03b8adb089bfa8cc8f0ee96701866#1029974bc0f03b8adb089bfa8cc8f0ee96701866" dependencies = [ "ahash 0.8.12", "arrow-buffer", @@ -302,8 +299,7 @@ dependencies = [ [[package]] name = "arrow-buffer" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0391c96eb58bf7389171d1e103112d3fc3e5625ca6b372d606f2688f1ea4cce" +source = "git+https://github.com/apache/arrow-rs.git?rev=1029974bc0f03b8adb089bfa8cc8f0ee96701866#1029974bc0f03b8adb089bfa8cc8f0ee96701866" dependencies = [ "bytes", "half", @@ -313,8 +309,7 @@ dependencies = [ [[package]] name = "arrow-cast" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f39e1d774ece9292697fcbe06b5584401b26bd34be1bec25c33edae65c2420ff" +source = "git+https://github.com/apache/arrow-rs.git?rev=1029974bc0f03b8adb089bfa8cc8f0ee96701866#1029974bc0f03b8adb089bfa8cc8f0ee96701866" dependencies = [ "arrow-array", "arrow-buffer", @@ -334,8 +329,7 @@ dependencies = [ [[package]] name = "arrow-csv" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9055c972a07bf12c2a827debfd34f88d3b93da1941d36e1d9fee85eebe38a12a" +source = "git+https://github.com/apache/arrow-rs.git?rev=1029974bc0f03b8adb089bfa8cc8f0ee96701866#1029974bc0f03b8adb089bfa8cc8f0ee96701866" dependencies = [ "arrow-array", "arrow-cast", @@ -350,8 +344,7 @@ dependencies = [ [[package]] name = "arrow-data" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf75ac27a08c7f48b88e5c923f267e980f27070147ab74615ad85b5c5f90473d" +source = "git+https://github.com/apache/arrow-rs.git?rev=1029974bc0f03b8adb089bfa8cc8f0ee96701866#1029974bc0f03b8adb089bfa8cc8f0ee96701866" dependencies = [ "arrow-buffer", "arrow-schema", @@ -362,8 +355,7 @@ dependencies = [ [[package]] name = "arrow-flight" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91efc67a4f5a438833dd76ef674745c80f6f6b9a428a3b440cbfbf74e32867e6" +source = "git+https://github.com/apache/arrow-rs.git?rev=1029974bc0f03b8adb089bfa8cc8f0ee96701866#1029974bc0f03b8adb089bfa8cc8f0ee96701866" dependencies = [ "arrow-arith", "arrow-array", @@ -389,8 +381,7 @@ dependencies = [ [[package]] name = "arrow-ipc" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a222f0d93772bd058d1268f4c28ea421a603d66f7979479048c429292fac7b2e" +source = "git+https://github.com/apache/arrow-rs.git?rev=1029974bc0f03b8adb089bfa8cc8f0ee96701866#1029974bc0f03b8adb089bfa8cc8f0ee96701866" dependencies = [ "arrow-array", "arrow-buffer", @@ -403,8 +394,7 @@ dependencies = [ [[package]] name = "arrow-json" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9085342bbca0f75e8cb70513c0807cc7351f1fbf5cb98192a67d5e3044acb033" +source = "git+https://github.com/apache/arrow-rs.git?rev=1029974bc0f03b8adb089bfa8cc8f0ee96701866#1029974bc0f03b8adb089bfa8cc8f0ee96701866" dependencies = [ "arrow-array", "arrow-buffer", @@ -425,8 +415,7 @@ dependencies = [ [[package]] name = "arrow-ord" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab2f1065a5cad7b9efa9e22ce5747ce826aa3855766755d4904535123ef431e7" +source = "git+https://github.com/apache/arrow-rs.git?rev=1029974bc0f03b8adb089bfa8cc8f0ee96701866#1029974bc0f03b8adb089bfa8cc8f0ee96701866" dependencies = [ "arrow-array", "arrow-buffer", @@ -438,8 +427,7 @@ dependencies = [ [[package]] name = "arrow-row" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3703a0e3e92d23c3f756df73d2dc9476873f873a76ae63ef9d3de17fda83b2d8" +source = "git+https://github.com/apache/arrow-rs.git?rev=1029974bc0f03b8adb089bfa8cc8f0ee96701866#1029974bc0f03b8adb089bfa8cc8f0ee96701866" dependencies = [ "arrow-array", "arrow-buffer", @@ -451,8 +439,7 @@ dependencies = [ [[package]] name = "arrow-schema" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73a47aa0c771b5381de2b7f16998d351a6f4eb839f1e13d48353e17e873d969b" +source = "git+https://github.com/apache/arrow-rs.git?rev=1029974bc0f03b8adb089bfa8cc8f0ee96701866#1029974bc0f03b8adb089bfa8cc8f0ee96701866" dependencies = [ "bitflags 2.9.1", "serde", @@ -462,8 +449,7 @@ dependencies = [ [[package]] name = "arrow-select" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24b7b85575702b23b85272b01bc1c25a01c9b9852305e5d0078c79ba25d995d4" +source = "git+https://github.com/apache/arrow-rs.git?rev=1029974bc0f03b8adb089bfa8cc8f0ee96701866#1029974bc0f03b8adb089bfa8cc8f0ee96701866" dependencies = [ "ahash 0.8.12", "arrow-array", @@ -476,8 +462,7 @@ dependencies = [ [[package]] name = "arrow-string" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9260fddf1cdf2799ace2b4c2fc0356a9789fa7551e0953e35435536fecefebbd" +source = "git+https://github.com/apache/arrow-rs.git?rev=1029974bc0f03b8adb089bfa8cc8f0ee96701866#1029974bc0f03b8adb089bfa8cc8f0ee96701866" dependencies = [ "arrow-array", "arrow-buffer", @@ -1441,9 +1426,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.39" +version = "4.5.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd60e63e9be68e5fb56422e397cf9baddded06dae1d2e523401542383bc72a9f" +checksum = "40b6887a1d8685cebccf115538db5c0efe625ccac9696ad45c409d96566e910f" dependencies = [ "clap_builder", "clap_derive", @@ -1451,9 +1436,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.39" +version = "4.5.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89cc6392a1f72bbeb820d71f32108f61fdaf18bc526e1d23954168a67759ef51" +checksum = "e0c66c08ce9f0c698cbce5c0279d0bb6ac936d8674174fe48f736533b964f59e" dependencies = [ "anstream", "anstyle", @@ -1463,9 +1448,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.32" +version = "4.5.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09176aae279615badda0765c0c0b3f6ed53f4709118af73cf4655d85d1530cd7" +checksum = "d2c7947ae4cc3d851207c1adb5b5e260ff0cca11446b1d6d1423788e442257ce" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -1635,7 +1620,7 @@ dependencies = [ "anes", "cast", "ciborium", - "clap 4.5.39", + "clap 4.5.40", "criterion-plot", "futures", "is-terminal", @@ -1954,7 +1939,7 @@ dependencies = [ "async-trait", "aws-config", "aws-credential-types", - "clap 4.5.39", + "clap 4.5.40", "ctor", "datafusion", "dirs", @@ -2606,7 +2591,7 @@ dependencies = [ "bigdecimal", "bytes", "chrono", - "clap 4.5.39", + "clap 4.5.40", "datafusion", "datafusion-spark", "datafusion-substrait", @@ -3973,7 +3958,7 @@ checksum = "5297962ef19edda4ce33aaa484386e0a5b3d7f2f4e037cbeee00503ef6b29d33" dependencies = [ "anstream", "anstyle", - "clap 4.5.39", + "clap 4.5.40", "escape8259", ] @@ -4419,8 +4404,7 @@ dependencies = [ [[package]] name = "parquet" version = "55.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be7b2d778f6b841d37083ebdf32e33a524acde1266b5884a8ca29bf00dfa1231" +source = "git+https://github.com/apache/arrow-rs.git?rev=1029974bc0f03b8adb089bfa8cc8f0ee96701866#1029974bc0f03b8adb089bfa8cc8f0ee96701866" dependencies = [ "ahash 0.8.12", "arrow-array", @@ -6235,9 +6219,9 @@ dependencies = [ [[package]] name = "testcontainers-modules" -version = "0.12.1" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac95cde96549fc19c6bf19ef34cc42bd56e264c1cb97e700e21555be0ecf9e2" +checksum = "7f29549c522bd43086d038c421ed69cdf88bc66387acf3aa92b26f965fa95ec2" dependencies = [ "testcontainers", ] diff --git a/Cargo.toml b/Cargo.toml index 64483eeb93da..0ffc534a873d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,19 +88,19 @@ ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } apache-avro = { version = "0.17", default-features = false } -arrow = { version = "55.1.0", features = [ +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "1029974bc0f03b8adb089bfa8cc8f0ee96701866", features = [ "prettyprint", "chrono-tz", ] } -arrow-buffer = { version = "55.0.0", default-features = false } -arrow-flight = { version = "55.1.0", features = [ +arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", rev = "1029974bc0f03b8adb089bfa8cc8f0ee96701866", default-features = false } +arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "1029974bc0f03b8adb089bfa8cc8f0ee96701866", features = [ "flight-sql-experimental", ] } -arrow-ipc = { version = "55.0.0", default-features = false, features = [ +arrow-ipc = { git = "https://github.com/apache/arrow-rs.git", rev = "1029974bc0f03b8adb089bfa8cc8f0ee96701866", default-features = false, features = [ "lz4", ] } -arrow-ord = { version = "55.0.0", default-features = false } -arrow-schema = { version = "55.0.0", default-features = false } +arrow-ord = { git = "https://github.com/apache/arrow-rs.git", rev = "1029974bc0f03b8adb089bfa8cc8f0ee96701866", default-features = false } +arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev = "1029974bc0f03b8adb089bfa8cc8f0ee96701866", default-features = false } async-trait = "0.1.88" bigdecimal = "0.4.8" bytes = "1.10" @@ -151,7 +151,7 @@ itertools = "0.14" log = "^0.4" object_store = { version = "0.12.0", default-features = false } parking_lot = "0.12" -parquet = { version = "55.1.0", default-features = false, features = [ +parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "1029974bc0f03b8adb089bfa8cc8f0ee96701866", default-features = false, features = [ "arrow", "async", "object_store", diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index a60beaf665e5..8c107c6263f0 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -28,7 +28,7 @@ use datafusion::execution::context::SessionState; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::prelude::SessionContext; use datafusion_common::stats::Precision; -use datafusion_common::DFSchema; +use datafusion_common::{DFSchema, ScalarValue}; use datafusion_execution::cache::cache_manager::CacheManagerConfig; use datafusion_execution::cache::cache_unit::{ DefaultFileStatisticsCache, DefaultListFilesCache, @@ -99,6 +99,35 @@ async fn check_stats_precision_with_filter_pushdown() { ); } +#[tokio::test] +async fn check_stats_inexact_for_truncated_values() { + let filename = "long_strings.parquet"; + let table_path = ListingTableUrl::parse(filename).unwrap(); + + let opt = + ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true); + let table = get_listing_table(&table_path, None, &opt).await; + + let (_, _, state) = get_cache_runtime_state(); + + let exec = table.scan(&state, None, &[], None).await.unwrap(); + assert_eq!( + exec.partition_statistics(None).unwrap().num_rows, + Precision::Exact(2), + "Stats without filter should be exact" + ); + + let col_stats = &exec.partition_statistics(None).unwrap().column_statistics[0]; + assert_eq!( + col_stats.min_value, + Precision::Inexact(ScalarValue::Utf8View(Some("A".repeat(4096)))) + ); + assert_eq!( + col_stats.max_value, + Precision::Inexact(ScalarValue::Utf8View(Some("Z".repeat(4095) + "["))) + ); +} + #[tokio::test] async fn load_table_stats_with_session_level_cache() { let testdata = datafusion::test_util::parquet_test_data(); diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 647fbc8d051e..d555fd47faa0 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -19,13 +19,13 @@ use std::any::Any; use std::cell::RefCell; -use std::fmt; use std::fmt::Debug; use std::ops::Range; use std::rc::Rc; use std::sync::Arc; +use std::{fmt, vec}; -use arrow::array::RecordBatch; +use arrow::array::{ArrayRef, BooleanArray, RecordBatch}; use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit}; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; @@ -36,14 +36,15 @@ use datafusion_datasource::write::{ use datafusion_datasource::file_format::{FileFormat, FileFormatFactory}; use datafusion_datasource::write::demux::DemuxedStreamReceiver; -use arrow::compute::sum; +use arrow::compute::kernels::cmp::eq; +use arrow::compute::{and, sum}; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ internal_datafusion_err, internal_err, not_impl_err, ColumnStatistics, - DataFusionError, GetExt, HashSet, Result, DEFAULT_PARQUET_EXTENSION, + DataFusionError, GetExt, HashSet, Result, ScalarValue, DEFAULT_PARQUET_EXTENSION, }; use datafusion_common::{HashMap, Statistics}; use datafusion_common_runtime::{JoinSet, SpawnedTask}; @@ -995,7 +996,7 @@ pub async fn fetch_statistics( /// # When only some columns have statistics: /// /// For columns with statistics: -/// - Min/max values are properly extracted and represented as Precision::Exact +/// - Min/max values are properly extracted and represented as Precision::Exact or Precision::Inexact depending on the `is_max_value_exact` and `is_min_value_exact` flags. /// - Null counts are calculated by summing across row groups /// /// For columns without statistics, @@ -1041,6 +1042,8 @@ pub fn statistics_from_parquet_meta_calc( let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); let mut null_counts_array = vec![Precision::Exact(0); table_schema.fields().len()]; + let mut is_max_value_exact = vec![Some(true); table_schema.fields().len()]; + let mut is_min_value_exact = vec![Some(true); table_schema.fields().len()]; table_schema .fields() @@ -1057,6 +1060,8 @@ pub fn statistics_from_parquet_meta_calc( &mut min_accs, &mut max_accs, &mut null_counts_array, + &mut is_min_value_exact, + &mut is_max_value_exact, idx, num_rows, &stats_converter, @@ -1076,6 +1081,8 @@ pub fn statistics_from_parquet_meta_calc( null_counts_array, &mut max_accs, &mut min_accs, + &mut is_max_value_exact, + &mut is_min_value_exact, ) } else { Statistics::unknown_column(&table_schema) @@ -1089,21 +1096,39 @@ fn get_col_stats( null_counts: Vec>, max_values: &mut [Option], min_values: &mut [Option], + is_max_value_exact: &mut [Option], + is_min_value_exact: &mut [Option], ) -> Vec { (0..schema.fields().len()) .map(|i| { - let max_value = match max_values.get_mut(i).unwrap() { - Some(max_value) => max_value.evaluate().ok(), - None => None, + let max_value = match ( + max_values.get_mut(i).unwrap(), + is_max_value_exact.get(i).unwrap(), + ) { + (Some(max_value), Some(true)) | (Some(max_value), None) => { + max_value.evaluate().ok().map(Precision::Exact) + } + (Some(max_value), Some(false)) => { + max_value.evaluate().ok().map(Precision::Inexact) + } + (None, _) => None, }; - let min_value = match min_values.get_mut(i).unwrap() { - Some(min_value) => min_value.evaluate().ok(), - None => None, + let min_value = match ( + min_values.get_mut(i).unwrap(), + is_min_value_exact.get(i).unwrap(), + ) { + (Some(min_value), Some(true)) | (Some(min_value), None) => { + min_value.evaluate().ok().map(Precision::Exact) + } + (Some(min_value), Some(false)) => { + min_value.evaluate().ok().map(Precision::Inexact) + } + (None, _) => None, }; ColumnStatistics { null_count: null_counts[i], - max_value: max_value.map(Precision::Exact).unwrap_or(Precision::Absent), - min_value: min_value.map(Precision::Exact).unwrap_or(Precision::Absent), + max_value: max_value.unwrap_or(Precision::Absent), + min_value: min_value.unwrap_or(Precision::Absent), sum_value: Precision::Absent, distinct_count: Precision::Absent, } @@ -1115,6 +1140,8 @@ fn summarize_min_max_null_counts( min_accs: &mut [Option], max_accs: &mut [Option], null_counts_array: &mut [Precision], + is_min_value_exact: &mut [Option], + is_max_value_exact: &mut [Option], arrow_schema_index: usize, num_rows: usize, stats_converter: &StatisticsConverter, @@ -1123,13 +1150,29 @@ fn summarize_min_max_null_counts( let max_values = stats_converter.row_group_maxes(row_groups_metadata)?; let min_values = stats_converter.row_group_mins(row_groups_metadata)?; let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?; + let is_max_value_exact_stat = + stats_converter.row_group_is_max_value_exact(row_groups_metadata)?; + let is_min_value_exact_stat = + stats_converter.row_group_is_min_value_exact(row_groups_metadata)?; if let Some(max_acc) = &mut max_accs[arrow_schema_index] { - max_acc.update_batch(&[max_values])?; + max_acc.update_batch(&[Arc::clone(&max_values)])?; + is_max_value_exact[arrow_schema_index] = get_exactness( + // Safety: `MaxAccumulator::evaluate()` always returns an Ok. + max_acc.evaluate().unwrap(), + max_values, + is_max_value_exact_stat, + ); } if let Some(min_acc) = &mut min_accs[arrow_schema_index] { - min_acc.update_batch(&[min_values])?; + min_acc.update_batch(&[Arc::clone(&min_values)])?; + is_min_value_exact[arrow_schema_index] = get_exactness( + // Safety: `MinAccumulator::evaluate()` always returns an Ok. + min_acc.evaluate().unwrap(), + min_values, + is_min_value_exact_stat, + ); } null_counts_array[arrow_schema_index] = Precision::Exact(match sum(&null_counts) { @@ -1729,6 +1772,19 @@ fn create_max_min_accs( (max_values, min_values) } +fn get_exactness( + value: ScalarValue, + values: ArrayRef, + exactness: BooleanArray, +) -> Option { + value.to_scalar().ok().and_then(|value| { + eq(&value, &values) + .ok() + .and_then(|mask| and(&mask, &exactness).ok()) + .map(|result| result.iter().filter(|&x| x == Some(true)).count() > 0) + }) +} + #[cfg(test)] mod tests { use std::sync::Arc;