Skip to content

Stop writing statistics to Parquet page headers by default, add option to enable #7594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3788,6 +3788,78 @@ mod tests {
assert_eq!(batches.len(), 0);
}

#[test]
fn test_page_stats_not_written_by_default() {
let string_field = Field::new("a", DataType::Utf8, false);
let schema = Schema::new(vec![string_field]);
let raw_string_values = vec!["Blart Versenwald III"];
let string_values = StringArray::from(raw_string_values.clone());
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();

let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.set_dictionary_enabled(false)
.set_encoding(Encoding::PLAIN)
.set_compression(crate::basic::Compression::UNCOMPRESSED)
.build();

let mut file = roundtrip_opts(&batch, props);

// read file and decode page headers
// Note: use the thrift API as there is no Rust API to access the statistics in the page headers
let mut buf = vec![];
file.seek(std::io::SeekFrom::Start(0)).unwrap();
let read = file.read_to_end(&mut buf).unwrap();
assert!(read > 0);

// decode first page header
let first_page = &buf[4..];
let mut prot = TCompactSliceInputProtocol::new(first_page);
let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
let stats = hdr.data_page_header.unwrap().statistics;

assert!(stats.is_none());
}

#[test]
fn test_page_stats_when_enabled() {
let string_field = Field::new("a", DataType::Utf8, false);
let schema = Schema::new(vec![string_field]);
let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
let string_values = StringArray::from(raw_string_values.clone());
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();

let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.set_dictionary_enabled(false)
.set_encoding(Encoding::PLAIN)
.set_write_page_header_statistics(true)
.set_compression(crate::basic::Compression::UNCOMPRESSED)
.build();

let mut file = roundtrip_opts(&batch, props);

// read file and decode page headers
// Note: use the thrift API as there is no Rust API to access the statistics in the page headers
let mut buf = vec![];
file.seek(std::io::SeekFrom::Start(0)).unwrap();
let read = file.read_to_end(&mut buf).unwrap();
assert!(read > 0);

// decode first page header
let first_page = &buf[4..];
let mut prot = TCompactSliceInputProtocol::new(first_page);
let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
let stats = hdr.data_page_header.unwrap().statistics;

let stats = stats.unwrap();
// check that min/max were actually written to the page
assert!(stats.is_max_value_exact.unwrap());
assert!(stats.is_min_value_exact.unwrap());
assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
}

#[test]
fn test_page_stats_truncation() {
let string_field = Field::new("a", DataType::Utf8, false);
Expand All @@ -3813,6 +3885,7 @@ mod tests {
.set_statistics_truncate_length(Some(2))
.set_dictionary_enabled(false)
.set_encoding(Encoding::PLAIN)
.set_write_page_header_statistics(true)
.set_compression(crate::basic::Compression::UNCOMPRESSED)
.build();

Expand Down
1 change: 1 addition & 0 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ mod test {
let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap();
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.set_write_page_header_statistics(true)
.build();

let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap();
Expand Down
39 changes: 34 additions & 5 deletions parquet/src/bin/parquet-rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,24 @@ struct Args {
#[clap(long)]
data_page_size_limit: Option<usize>,

/// Sets max statistics size for all columns.
/// Sets the max length of min/max statistics in row group and data page
/// header statistics for all columns.
///
/// Applicable only if statistics are enabled.
#[clap(long)]
max_statistics_size: Option<usize>,
statistics_truncate_length: Option<usize>,

/// Sets the max length of min/max statistics in the column index.
///
/// Applicable only if statistics are enabled.
#[clap(long)]
column_index_truncate_length: Option<usize>,

/// Write statistics to the data page headers?
///
/// Setting this true will also enable page level statistics.
#[clap(long)]
write_page_header_statistics: Option<bool>,

/// Sets whether bloom filter is enabled for all columns.
#[clap(long)]
Expand Down Expand Up @@ -324,9 +337,16 @@ fn main() {
if let Some(value) = args.data_page_size_limit {
writer_properties_builder = writer_properties_builder.set_data_page_size_limit(value);
}
#[allow(deprecated)]
if let Some(value) = args.max_statistics_size {
writer_properties_builder = writer_properties_builder.set_max_statistics_size(value);
if let Some(value) = args.dictionary_page_size_limit {
writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value);
}
if let Some(value) = args.statistics_truncate_length {
writer_properties_builder =
writer_properties_builder.set_statistics_truncate_length(Some(value));
}
if let Some(value) = args.column_index_truncate_length {
writer_properties_builder =
writer_properties_builder.set_column_index_truncate_length(Some(value));
}
if let Some(value) = args.bloom_filter_enabled {
writer_properties_builder = writer_properties_builder.set_bloom_filter_enabled(value);
Expand All @@ -347,6 +367,15 @@ fn main() {
if let Some(value) = args.statistics_enabled {
writer_properties_builder = writer_properties_builder.set_statistics_enabled(value.into());
}
// set this after statistics_enabled
if let Some(value) = args.write_page_header_statistics {
writer_properties_builder =
writer_properties_builder.set_write_page_header_statistics(value);
if value {
writer_properties_builder =
writer_properties_builder.set_statistics_enabled(EnabledStatistics::Page);
}
}
if let Some(value) = args.writer_version {
writer_properties_builder = writer_properties_builder.set_writer_version(value.into());
}
Expand Down
12 changes: 9 additions & 3 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,8 +1048,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
self.column_metrics
.update_variable_length_bytes(values_data.variable_length_bytes);

let page_statistics = page_statistics.map(Statistics::from);
let page_statistics = page_statistics.map(|stats| self.truncate_statistics(stats));
// From here on, we only need page statistics if they will be written to the page header.
let page_statistics = page_statistics
.filter(|_| self.props.write_page_header_statistics(self.descr.path()))
.map(|stats| self.truncate_statistics(Statistics::from(stats)));

let compressed_page = match self.props.writer_version() {
WriterVersion::PARQUET_1_0 => {
Expand Down Expand Up @@ -2233,7 +2235,11 @@ mod tests {
let mut buf = Vec::with_capacity(100);
let mut write = TrackedWrite::new(&mut buf);
let page_writer = Box::new(SerializedPageWriter::new(&mut write));
let props = Default::default();
let props = Arc::new(
WriterProperties::builder()
.set_write_page_header_statistics(true)
.build(),
);
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);

writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
Expand Down
116 changes: 95 additions & 21 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize = DEFAULT_PAGE_SIZE;
pub const DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT: usize = 20_000;
/// Default value for [`WriterProperties::statistics_enabled`]
pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page;
/// Default value for [`WriterProperties::write_page_header_statistics`]
pub const DEFAULT_WRITE_PAGE_HEADER_STATISTICS: bool = false;
/// Default value for [`WriterProperties::max_statistics_size`]
#[deprecated(since = "54.0.0", note = "Unused; will be removed in 56.0.0")]
pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
Expand Down Expand Up @@ -396,6 +398,22 @@ impl WriterProperties {
.unwrap_or(DEFAULT_STATISTICS_ENABLED)
}

/// Returns `true` if [`Statistics`] are to be written to the page header for a column.
///
/// For more details see [`WriterPropertiesBuilder::set_write_page_header_statistics`]
///
/// [`Statistics`]: crate::file::statistics::Statistics
pub fn write_page_header_statistics(&self, col: &ColumnPath) -> bool {
self.column_properties
.get(col)
.and_then(|c| c.write_page_header_statistics())
.or_else(|| {
self.default_column_properties
.write_page_header_statistics()
})
.unwrap_or(DEFAULT_WRITE_PAGE_HEADER_STATISTICS)
}

/// Returns max size for statistics.
///
/// UNUSED
Expand Down Expand Up @@ -544,23 +562,6 @@ impl WriterPropertiesBuilder {
self
}

/// Sets best effort maximum dictionary page size, in bytes (defaults to `1024 * 1024`
/// via [`DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT`]).
///
/// The parquet writer will attempt to limit the size of each
/// `DataPage` used to store dictionaries to this many
/// bytes. Reducing this value will result in larger parquet
/// files, but may improve the effectiveness of page index based
/// predicate pushdown during reading.
///
/// Note: this is a best effort limit based on value of
/// [`set_write_batch_size`](Self::set_write_batch_size).
pub fn set_dictionary_page_size_limit(mut self, value: usize) -> Self {
self.default_column_properties
.set_dictionary_page_size_limit(value);
self
}

/// Sets write batch size (defaults to 1024 via [`DEFAULT_WRITE_BATCH_SIZE`]).
///
/// For performance reasons, data for each column is written in
Expand Down Expand Up @@ -753,7 +754,24 @@ impl WriterPropertiesBuilder {
self
}

/// Sets default statistics level for all columns (defaults to [`Page`] via
/// Sets best effort maximum dictionary page size, in bytes (defaults to `1024 * 1024`
/// via [`DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT`]).
///
/// The parquet writer will attempt to limit the size of each
/// `DataPage` used to store dictionaries to this many
/// bytes. Reducing this value will result in larger parquet
/// files, but may improve the effectiveness of page index based
/// predicate pushdown during reading.
///
/// Note: this is a best effort limit based on value of
/// [`set_write_batch_size`](Self::set_write_batch_size).
pub fn set_dictionary_page_size_limit(mut self, value: usize) -> Self {
self.default_column_properties
.set_dictionary_page_size_limit(value);
self
}

/// Sets default [`EnabledStatistics`] level for all columns (defaults to [`Page`] via
/// [`DEFAULT_STATISTICS_ENABLED`]).
///
/// [`Page`]: EnabledStatistics::Page
Expand All @@ -762,6 +780,33 @@ impl WriterPropertiesBuilder {
self
}

/// enable/disable writing [`Statistics`] in the page header
/// (defaults to `false` via [`DEFAULT_WRITE_PAGE_HEADER_STATISTICS`]).
///
/// Only applicable if [`Page`] level statistics are gathered.
///
/// Setting this value to `true` can greatly increase the size of the resulting Parquet
/// file while yielding very little added benefit. Most modern Parquet implementations
/// will use the min/max values stored in the [`ParquetColumnIndex`] rather than
/// those in the page header.
///
/// # Note
///
/// Prior to version 56.0.0, the `parquet` crate always wrote these
/// statistics (the equivalent of setting this option to `true`). This was
/// changed in 56.0.0 to follow the recommendation in the Parquet
/// specification. See [issue #7580] for more details.
///
/// [`Statistics`]: crate::file::statistics::Statistics
/// [`ParquetColumnIndex`]: crate::file::metadata::ParquetColumnIndex
/// [`Page`]: EnabledStatistics::Page
/// [issue #7580]: https://github.com/apache/arrow-rs/issues/7580
pub fn set_write_page_header_statistics(mut self, value: bool) -> Self {
self.default_column_properties
.set_write_page_header_statistics(value);
self
}

/// Sets default max statistics size for all columns (defaults to `4096` via
/// [`DEFAULT_MAX_STATISTICS_SIZE`]).
///
Expand Down Expand Up @@ -867,7 +912,7 @@ impl WriterPropertiesBuilder {
self
}

/// Sets statistics level for a specific column
/// Sets [`EnabledStatistics`] level for a specific column.
///
/// Takes precedence over [`Self::set_statistics_enabled`].
pub fn set_column_statistics_enabled(
Expand All @@ -879,6 +924,17 @@ impl WriterPropertiesBuilder {
self
}

/// Sets whether to write [`Statistics`] in the page header for a specific column.
///
/// Takes precedence over [`Self::set_write_page_header_statistics`].
///
/// [`Statistics`]: crate::file::statistics::Statistics
pub fn set_column_write_page_header_statistics(mut self, col: ColumnPath, value: bool) -> Self {
self.get_mut_props(col)
.set_write_page_header_statistics(value);
self
}

/// Sets max size for statistics for a specific column.
///
/// Takes precedence over [`Self::set_max_statistics_size`].
Expand Down Expand Up @@ -936,8 +992,12 @@ pub enum EnabledStatistics {
/// Compute page-level and column chunk-level statistics.
///
/// Setting this option will store one set of statistics for each relevant
/// column for each page and row group. The more row groups and the more
/// pages written, the more statistics will be stored.
/// column for each row group. In addition, this will enable the writing
/// of the column index (the offset index is always written regardless of
/// this setting). See [`ParquetColumnIndex`] for
/// more information.
///
/// [`ParquetColumnIndex`]: crate::file::metadata::ParquetColumnIndex
Page,
}

Expand Down Expand Up @@ -1008,6 +1068,7 @@ struct ColumnProperties {
dictionary_page_size_limit: Option<usize>,
dictionary_enabled: Option<bool>,
statistics_enabled: Option<EnabledStatistics>,
write_page_header_statistics: Option<bool>,
#[deprecated(since = "54.0.0", note = "Unused; will be removed in 56.0.0")]
max_statistics_size: Option<usize>,
/// bloom filter related properties
Expand Down Expand Up @@ -1051,6 +1112,11 @@ impl ColumnProperties {
self.statistics_enabled = Some(enabled);
}

/// Sets whether to write statistics in the page header for this column.
fn set_write_page_header_statistics(&mut self, enabled: bool) {
self.write_page_header_statistics = Some(enabled);
}

/// Sets max size for statistics for this column.
#[deprecated(since = "54.0.0", note = "Unused; will be removed in 56.0.0")]
#[allow(deprecated)]
Expand Down Expand Up @@ -1122,6 +1188,14 @@ impl ColumnProperties {
self.statistics_enabled
}

/// Returns `Some(true)` if [`Statistics`] are to be written to the page header for this
/// column.
///
/// [`Statistics`]: crate::file::statistics::Statistics
fn write_page_header_statistics(&self) -> Option<bool> {
self.write_page_header_statistics
}

/// Returns optional max size in bytes for statistics.
#[deprecated(since = "54.0.0", note = "Unused; will be removed in 56.0.0")]
fn max_statistics_size(&self) -> Option<usize> {
Expand Down
Loading
Loading