diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 922e7bab9..569959d6f 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -30,7 +30,8 @@ use super::table_metadata::SnapshotLog; use crate::error::{timestamp_ms_to_utc, Result}; use crate::io::FileIO; use crate::spec::{ManifestList, SchemaId, SchemaRef, StructType, TableMetadata}; -use crate::{Error, ErrorKind}; +use crate::transaction::Transaction; +use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; /// The ref name of the main branch of the table. pub const MAIN_BRANCH: &str = "main"; @@ -216,6 +217,227 @@ impl Snapshot { } } +/// `ManageSnapshots` is an API for managing snapshots. +#[allow(dead_code)] +pub struct ManageSnapshots<'a> { + transaction: &'a Transaction<'a>, + updates: Vec, + requirements: Vec, +} + +impl<'a> ManageSnapshots<'a> { + /// Creates new instance of `ManageSnapshots` + pub fn new(transaction: &'a Transaction) -> Self { + Self { + transaction, + updates: Vec::new(), + requirements: Vec::new(), + } + } + + /// Sets a snapshot reference and returns the update and requirement + /// + /// # Arguments + /// + /// * `snapshot_id` - The snapshot ID that the reference will point to + /// * `ref_name` - The name of the snapshot reference. + /// * `update_type` - The type of the reference ("tag" or "branch"). + /// * `max_ref_age_ms` - Optional maximum age (in milliseconds) for the reference. + /// * `max_snapshot_age_ms` - Optional maximum age (in milliseconds) for the snapshot. + /// * `min_snapshots_to_keep` - Optional minimum number of snapshots to keep. + /// + /// # Returns + /// + /// A `Result` containing a tuple, (`TableUpdate`, `TableRequirement`), + /// or an error if an invalid `update_type` is provided. + pub fn set_ref_snapshot( + &self, + snapshot_id: i64, + ref_name: String, + update_type: &str, + max_ref_age_ms: Option, + max_snapshot_age_ms: Option, + min_snapshots_to_keep: Option, + ) -> Result<(TableUpdate, TableRequirement)> { + let retention = match update_type { + "tag" => SnapshotRetention::Tag { max_ref_age_ms }, + "branch" => SnapshotRetention::Branch { + min_snapshots_to_keep, + max_snapshot_age_ms, + max_ref_age_ms, + }, + other => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Snapshot type, '{}', does not exist", other), + )) + } + }; + + let new_ref = SnapshotReference::new(snapshot_id, retention); + + let update = TableUpdate::SetSnapshotRef { + ref_name: ref_name.clone(), + reference: new_ref, + }; + + let current_snapshot_id = self + .transaction + .current_table() + .metadata() + .refs + .get(&ref_name) + .map(|r| r.snapshot_id); + let requirement = TableRequirement::RefSnapshotIdMatch { + r#ref: ref_name, + snapshot_id: current_snapshot_id, + }; + + Ok((update, requirement)) + } + + /// Removes a snapshot reference. + /// + /// # Arguments + /// + /// * `ref_name` - The name of the snapshot reference to remove. + /// + /// # Returns + /// + /// A mutable reference to `ManageSnapshots` for method chaining on success, + /// or an error if the reference does not exist. + pub fn remove_ref_snapshot(&mut self, ref_name: String) -> Result<&mut Self> { + let snapshot_id = match self + .transaction + .current_table() + .metadata() + .refs + .get(&ref_name) + { + Some(r) => r.snapshot_id, + None => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Reference '{}' does not exist", ref_name), + )); + } + }; + + let update = TableUpdate::RemoveSnapshotRef { + ref_name: ref_name.clone(), + }; + + let requirement = TableRequirement::RefSnapshotIdMatch { + r#ref: ref_name, + snapshot_id: Some(snapshot_id), + }; + + self.updates.push(update); + self.requirements.push(requirement); + Ok(self) + } + + /// Creates a new tag + /// + /// # Arguments + /// + /// * `snapshot_id` - The snapshot ID to tag. + /// * `tag_name` - The name of the tag. + /// * `max_ref_age_ms` - Optional maximum age (in milliseconds) for the tag reference + /// + /// # Returns + /// + /// A mutable reference to `ManageSnapshots` for method chaining, + /// or an error if the operation fails. + pub fn create_tag( + &mut self, + snapshot_id: i64, + tag_name: String, + max_ref_age_ms: Option, + ) -> Result<&mut Self> { + let (update, requirement) = self.set_ref_snapshot( + snapshot_id, + tag_name.clone(), + "tag", + max_ref_age_ms, + None, + None, + )?; + + self.updates.push(update); + self.requirements.push(requirement); + + Ok(self) + } + + /// Removes a tag. + /// + /// # Arguments + /// + /// * `tag_name` - The name of the tag to remove. + /// + /// # Returns + /// + /// A mutable reference to `ManageSnapshots` for method chaining, + /// or an error if the tag reference does not exist. + pub fn remove_tag(&mut self, tag_name: String) -> Result<&mut Self> { + self.remove_ref_snapshot(tag_name) + } + + /// Creates a new branch + /// + /// # Arguments + /// + /// * `snapshot_id` - The snapshot ID the branch will point to. + /// * `branch_name` - The name of the branch to create. + /// * `max_ref_age_ms` - Optional maximum age (in milliseconds) for the branch reference. + /// * `max_snapshot_age_ms` - Optional maximum age (in milliseconds) for the snapshots. + /// * `min_snapshots_to_keep` - Optional minimum number of snapshots to retain. + /// + /// # Returns + /// + /// A mutable reference to `ManageSnapshots` for method chaining, + /// or an error if the operation fails. + pub fn create_branch( + &mut self, + snapshot_id: i64, + branch_name: String, + max_ref_age_ms: Option, + max_snapshot_age_ms: Option, + min_snapshots_to_keep: Option, + ) -> Result<&mut Self> { + let (update, requirement) = self + .set_ref_snapshot( + snapshot_id, + branch_name.clone(), + "branch", + max_ref_age_ms, + max_snapshot_age_ms, + min_snapshots_to_keep, + ) + .unwrap(); + + self.updates.push(update); + self.requirements.push(requirement); + + Ok(self) + } + + /// Removes a branch. + /// + /// # Arguments + /// + /// * `branch_name` - The name of the branch to remove. + /// + /// # Returns + /// + /// A mutable reference to `ManageSnapshots` for method chaining, + /// or an error if the branch reference does not exist. + pub fn remove_branch(&mut self, branch_name: String) -> Result<&mut Self> { + self.remove_ref_snapshot(branch_name) + } +} + pub(super) mod _serde { /// This is a helper module that defines types to help with serialization/deserialization. /// For deserialization the input first gets read into either the [SnapshotV1] or [SnapshotV2] struct diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 6ae25775b..5fb73c618 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -55,6 +55,11 @@ impl<'a> Transaction<'a> { } } + /// Returns current table + pub fn current_table(&self) -> &Table { + &self.current_table + } + fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> { let mut metadata_builder = self.current_table.metadata().clone().into_builder(None); for update in updates {