diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index a2716ad97..d6105273a 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -42,6 +42,7 @@ pub type SnapshotRef = Arc; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(rename_all = "lowercase")] /// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots. +#[derive(Hash)] pub enum Operation { /// Only data files were added and no files were removed. Append, diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 574904b28..fdfc15417 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -27,6 +27,7 @@ use crate::transaction::Transaction; use crate::transaction::snapshot::{ DefaultManifestProcess, SnapshotProduceAction, SnapshotProduceOperation, }; +use crate::transaction::validate::SnapshotValidator; use crate::writer::file_writer::ParquetWriter; use crate::{Error, ErrorKind}; @@ -209,6 +210,8 @@ impl SnapshotProduceOperation for FastAppendOperation { } } +impl SnapshotValidator for FastAppendOperation {} + #[cfg(test)] mod tests { use crate::scan::tests::TableTestFixture; diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index ba79d60bb..43460bdc8 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -20,6 +20,7 @@ mod append; mod snapshot; mod sort_order; +mod validate; use std::cmp::Ordering; use std::collections::HashMap; diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index a15e17f1d..b9dbaf9f8 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -30,11 +30,12 @@ use crate::spec::{ SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries, }; use crate::transaction::Transaction; +use crate::transaction::validate::SnapshotValidator; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; -pub(crate) trait SnapshotProduceOperation: Send + Sync { +pub(crate) trait SnapshotProduceOperation: Send + SnapshotValidator + Sync { fn operation(&self) -> Operation; #[allow(unused)] fn delete_entries( diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs new file mode 100644 index 000000000..935f93af6 --- /dev/null +++ b/crates/iceberg/src/transaction/validate.rs @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use crate::spec::{ManifestContentType, ManifestFile, Operation, SnapshotRef, TableMetadata}; +use crate::table::Table; + +pub(crate) trait SnapshotValidator { + #[allow(dead_code)] + fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) {} + + #[allow(dead_code)] + async fn validation_history( + &self, + base: &Table, + to_snapshot: &SnapshotRef, + from_snapshot: Option<&SnapshotRef>, + matching_operations: HashSet, + manifest_content_type: ManifestContentType, + ) -> (Vec, HashSet) { + let mut manifests = vec![]; + let mut new_snapshots = HashSet::new(); + let mut last_snapshot: Option<&SnapshotRef> = None; + + let snapshots = Self::ancestors_between(to_snapshot, from_snapshot, base.metadata()); + for current_snapshot in &snapshots { + last_snapshot = Some(current_snapshot); + + if matching_operations.contains(¤t_snapshot.summary().operation) { + new_snapshots.insert(current_snapshot.snapshot_id()); + current_snapshot + .load_manifest_list(base.file_io(), base.metadata()) + .await + .expect("Failed to load manifest list!") + .entries() + .iter() + .for_each(|manifest| { + if manifest.content == manifest_content_type + && manifest.added_snapshot_id == current_snapshot.snapshot_id() + { + manifests.push(manifest.clone()); + } + }); + } + } + + if last_snapshot.is_some() + && last_snapshot.unwrap().parent_snapshot_id() + != from_snapshot.map(|snapshot| snapshot.snapshot_id()) + { + panic!( + "Cannot determine history between starting snapshot {} and the last known ancestor {}", + from_snapshot.map_or_else( + || "None".to_string(), + |snapshot| snapshot.snapshot_id().to_string() + ), + last_snapshot.map_or_else( + || "None".to_string(), + |snapshot| snapshot.parent_snapshot_id().unwrap().to_string() + ) + ); + } + + (manifests, new_snapshots) + } + + /// find ancestors in (from_snapshot, to_snapshot] + /// TODO: Return an iterator instead of a vector + fn ancestors_between( + to_snapshot: &SnapshotRef, + from_snapshot: Option<&SnapshotRef>, + table_metadata: &TableMetadata, + ) -> Vec { + let mut snapshots = Vec::new(); + let mut current_snapshot = Some(to_snapshot); + while let Some(snapshot) = current_snapshot { + snapshots.push(Arc::clone(snapshot)); + match snapshot.parent_snapshot_id() { + Some(parent_snapshot_id) + if from_snapshot.is_some() + && parent_snapshot_id == from_snapshot.unwrap().snapshot_id() => + { + break; + } + Some(parent_snapshot_id) => { + current_snapshot = table_metadata.snapshot_by_id(parent_snapshot_id) + } + None => break, + } + } + + snapshots + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use crate::TableUpdate; + use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestContentType, Operation, + SnapshotRef, Struct, + }; + use crate::transaction::tests::{make_v2_minimal_table, make_v2_table}; + use crate::transaction::validate::SnapshotValidator; + use crate::transaction::{Table, Transaction}; + + struct TestValidator {} + + impl SnapshotValidator for TestValidator {} + + async fn make_v2_table_with_updates() -> (Table, Vec) { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + let mut action = tx.fast_append(None, vec![]).unwrap(); + + let data_file_1 = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/1.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap(); + + let data_file_2 = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/2.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap(); + + action.add_data_files(vec![data_file_1.clone()]).unwrap(); + let tx = action.apply().await.unwrap(); + let mut action = tx.fast_append(None, vec![]).unwrap(); + action.add_data_files(vec![data_file_2.clone()]).unwrap(); + let tx = action.apply().await.unwrap(); + + (table.clone(), tx.updates) + } + + #[tokio::test] + async fn test_validation_history() { + let (table, updates) = make_v2_table_with_updates().await; + let parent_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + SnapshotRef::new(snapshot.clone()) + } else { + unreachable!() + }; + let current_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[2] { + SnapshotRef::new(snapshot.clone()) + } else { + unreachable!() + }; + + let test_validator = TestValidator {}; + + // specifying from_snapshot, validating up to the from_snapshot + let (manifests, snapshots) = test_validator + .validation_history( + &table, + ¤t_snapshot, + Some(&parent_snapshot), + HashSet::from([Operation::Append]), + ManifestContentType::Data, + ) + .await; + + manifests + .iter() + .for_each(|manifest| assert_eq!(manifest.content, ManifestContentType::Data)); + assert_eq!(snapshots.into_iter().collect::>(), vec![ + current_snapshot.snapshot_id() + ]); + } + + #[test] + fn test_ancestor_between() { + let table = make_v2_table(); + let current_snapshot = table.metadata().current_snapshot(); + let parent_snapshot_id = current_snapshot.unwrap().parent_snapshot_id().unwrap(); + let parent_snapshot = table.metadata().snapshot_by_id(parent_snapshot_id); + + // not specifying from_snapshot, listing all ancestors + let all_ancestors = + TestValidator::ancestors_between(current_snapshot.unwrap(), None, table.metadata()); + assert_eq!( + vec![ + current_snapshot.unwrap().snapshot_id(), + current_snapshot.unwrap().parent_snapshot_id().unwrap() + ], + all_ancestors + .iter() + .map(|snapshot| snapshot.snapshot_id()) + .collect::>() + ); + + // specifying from_snapshot, listing only 1 snapshot + let ancestors = + TestValidator::ancestors_between(current_snapshot.unwrap(), parent_snapshot, table.metadata()); + assert_eq!( + vec![current_snapshot.unwrap().snapshot_id()], + ancestors + .iter() + .map(|snapshot| snapshot.snapshot_id()) + .collect::>() + ); + } +}