diff --git a/src/page/manager.rs b/src/page/manager.rs index 21996001..2fdcc65d 100644 --- a/src/page/manager.rs +++ b/src/page/manager.rs @@ -1,5 +1,6 @@ use crate::page::PageId; +mod dirty; pub(super) mod mmap; pub(super) mod options; diff --git a/src/page/manager/dirty.rs b/src/page/manager/dirty.rs new file mode 100644 index 00000000..f50417e9 --- /dev/null +++ b/src/page/manager/dirty.rs @@ -0,0 +1,243 @@ +use std::collections::{BTreeMap, HashSet}; + +use crate::page::PageId; + +/// A container which tracks dirty pages in a memory mapped file. +#[derive(Debug, Default)] +pub(crate) struct DirtyPages { + // A set of reallocated dirty pages which need to be flushed to disk. + set: HashSet, + // Half-open ranges [start, end) of dirty pages, non-overlapping and non-adjacent. + runs: BTreeMap, +} + +impl DirtyPages { + /// Iterate over the dirty page runs as (byte offset, length) pairs. + pub(crate) fn byte_runs<'a>(&'a self) -> impl Iterator + 'a { + self.runs + .iter() + .map(|(&start, &end)| (start.as_offset(), end.as_offset() - start.as_offset())) + } + + /// Marks a page as dirty, returning true if the page was newly marked. + /// This attempts to find + pub(crate) fn mark_dirty(&mut self, page_id: PageId) -> bool { + if !self.set.insert(page_id) { + return false; + } + + let next_page_id = page_id.inc(); + + // Find predecessor run: end <= id + let predecessor = self.runs.range(..=page_id).next_back(); + let join_left = predecessor.is_some_and(|(_, &end)| end == page_id); + + // Find successor run: start > id + let successor = + next_page_id.and_then(|next_page_id| self.runs.range(next_page_id..).next()); + let join_right = successor.is_some_and(|(&start, _)| start == next_page_id.unwrap()); + + match (join_left, join_right) { + (true, true) => { + // Join predecessor and successor runs into a single run + let (&l_start, _) = predecessor.unwrap(); + let (&r_start, &r_end) = successor.unwrap(); + self.runs.remove(&l_start); + self.runs.remove(&r_start); + self.runs.insert(l_start, r_end); + } + (true, false) => { + // Extend predecessor run to include id + let (&l_start, _) = predecessor.unwrap(); + let l_end = self.runs.get_mut(&l_start).unwrap(); + *l_end = next_page_id.unwrap(); + } + (false, true) => { + // Extend successor run backwards to include id + let (&r_start, &r_end) = successor.unwrap(); + self.runs.remove(&r_start); + self.runs.insert(page_id, r_end); + } + (false, false) => { + // Create new run containing only the id + self.runs.insert(page_id, next_page_id.unwrap()); + } + } + + true + } + + /// Clears the dirty pages set and runs. + pub(crate) fn clear(&mut self) { + self.set.clear(); + self.runs.clear(); + } +} + +#[cfg(test)] +mod tests { + use proptest::prelude::*; + + use crate::page_id; + + use super::*; + + #[test] + fn test_clear() { + let mut dirty_pages = DirtyPages::default(); + assert!(dirty_pages.mark_dirty(page_id!(1))); + assert!(!dirty_pages.mark_dirty(page_id!(1))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(0, 4096)]); + + dirty_pages.clear(); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![]); + + assert!(dirty_pages.mark_dirty(page_id!(1))); + assert!(!dirty_pages.mark_dirty(page_id!(1))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(0, 4096)]); + + dirty_pages.clear(); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![]); + } + + #[test] + fn test_mark_dirty_noncontiguous_runs() { + let mut dirty_pages = DirtyPages::default(); + + assert!(dirty_pages.mark_dirty(page_id!(1))); + assert!(!dirty_pages.mark_dirty(page_id!(1))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(0, 4096)]); + + assert!(dirty_pages.mark_dirty(page_id!(3))); + assert!(!dirty_pages.mark_dirty(page_id!(3))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(0, 4096), (8192, 4096)]); + + assert!(dirty_pages.mark_dirty(page_id!(11))); + assert_eq!( + dirty_pages.byte_runs().collect::>(), + vec![(0, 4096), (8192, 4096), (40960, 4096)] + ); + + assert!(dirty_pages.mark_dirty(page_id!(101))); + assert_eq!( + dirty_pages.byte_runs().collect::>(), + vec![(0, 4096), (8192, 4096), (40960, 4096), (409600, 4096)] + ); + + dirty_pages.clear(); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![]); + } + + #[test] + fn test_mark_dirty_extend_right() { + let mut dirty_pages = DirtyPages::default(); + + assert!(dirty_pages.mark_dirty(page_id!(1))); + assert!(!dirty_pages.mark_dirty(page_id!(1))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(0, 4096)]); + + assert!(dirty_pages.mark_dirty(page_id!(2))); + assert!(!dirty_pages.mark_dirty(page_id!(2))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(0, 8192)]); + + assert!(dirty_pages.mark_dirty(page_id!(3))); + assert!(!dirty_pages.mark_dirty(page_id!(3))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(0, 12288)]); + + assert!(dirty_pages.mark_dirty(page_id!(11))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(0, 12288), (40960, 4096)]); + + assert!(dirty_pages.mark_dirty(page_id!(12))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(0, 12288), (40960, 8192)]); + + dirty_pages.clear(); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![]); + } + + #[test] + fn test_mark_dirty_extend_left() { + let mut dirty_pages = DirtyPages::default(); + + assert!(dirty_pages.mark_dirty(page_id!(100))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(405504, 4096)]); + + assert!(dirty_pages.mark_dirty(page_id!(99))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(401408, 8192)]); + + assert!(dirty_pages.mark_dirty(page_id!(98))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(397312, 12288)]); + + assert!(dirty_pages.mark_dirty(page_id!(2))); + assert_eq!( + dirty_pages.byte_runs().collect::>(), + vec![(4096, 4096), (397312, 12288)] + ); + + assert!(dirty_pages.mark_dirty(page_id!(1))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(0, 8192), (397312, 12288)]); + + dirty_pages.clear(); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![]); + } + + #[test] + fn test_mark_dirty_extend_both() { + let mut dirty_pages = DirtyPages::default(); + + assert!(dirty_pages.mark_dirty(page_id!(100))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(405504, 4096)]); + + assert!(dirty_pages.mark_dirty(page_id!(99))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(401408, 8192)]); + + assert!(dirty_pages.mark_dirty(page_id!(101))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(401408, 12288)]); + + dirty_pages.clear(); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![]); + } + + #[test] + fn test_mark_dirty_merge_runs() { + let mut dirty_pages = DirtyPages::default(); + + assert!(dirty_pages.mark_dirty(page_id!(100))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(405504, 4096)]); + + assert!(dirty_pages.mark_dirty(page_id!(98))); + assert_eq!( + dirty_pages.byte_runs().collect::>(), + vec![(397312, 4096), (405504, 4096)] + ); + + assert!(dirty_pages.mark_dirty(page_id!(99))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(397312, 12288)]); + + assert!(dirty_pages.mark_dirty(page_id!(102))); + assert_eq!( + dirty_pages.byte_runs().collect::>(), + vec![(397312, 12288), (413696, 4096)] + ); + + assert!(dirty_pages.mark_dirty(page_id!(101))); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![(397312, 20480)]); + + dirty_pages.clear(); + assert_eq!(dirty_pages.byte_runs().collect::>(), vec![]); + } + + proptest! { + #[test] + fn test_mark_dirty_random(page_ids in prop::collection::vec(any::(), 1..1000)) { + let mut dirty_pages = DirtyPages::default(); + for page_id in page_ids { + dirty_pages.mark_dirty(page_id); + } + + let runs = dirty_pages.byte_runs().collect::>(); + assert!(runs.is_sorted_by_key(|(start, _)| *start)); + // ensure that runs are non-overlapping and non-adjacent + assert!(runs.windows(2).all(|w| w[0].0 + w[0].1 < w[1].0)); + } + } +} diff --git a/src/page/manager/mmap.rs b/src/page/manager/mmap.rs index 41d62087..b03311d2 100644 --- a/src/page/manager/mmap.rs +++ b/src/page/manager/mmap.rs @@ -1,5 +1,6 @@ use crate::{ page::{ + manager::dirty::DirtyPages, state::{PageState, RawPageState}, Page, PageError, PageId, PageManagerOptions, PageMut, }, @@ -21,6 +22,7 @@ pub struct PageManager { file: Mutex, file_len: AtomicU64, page_count: AtomicU32, + dirty_pages: Mutex, } impl PageManager { @@ -104,6 +106,7 @@ impl PageManager { file: Mutex::new(file), file_len: AtomicU64::new(file_len), page_count: AtomicU32::new(opts.page_count), + dirty_pages: Mutex::new(DirtyPages::default()), }) } @@ -216,6 +219,8 @@ impl PageManager { // SAFETY: We have checked that the page fits inside the memory map. let data = unsafe { self.mmap.as_mut_ptr().byte_add(offset).cast() }; + self.mark_dirty(page_id); + // TODO: This is actually unsafe, as it's possible to call `get()` arbitrary times before // calling this function (this will be fixed in a future commit). unsafe { PageMut::from_ptr(page_id, snapshot_id, data) } @@ -237,6 +242,8 @@ impl PageManager { // SAFETY: We have checked that the page fits inside the memory map. let data = unsafe { self.mmap.as_mut_ptr().byte_add(offset).cast() }; + self.mark_dirty(page_id); + // SAFETY: // - This is a newly created page at the end of the file, so we're guaranteed to have // exclusive access to it. Even if another thread was calling `allocate()` at the same @@ -265,10 +272,22 @@ impl PageManager { Ok(matches!(state.load(), PageState::Dirty(_))) } - /// Syncs pages to the backing file. + /// Adds a page to the dirty set, to be flushed to disk on next sync. + #[inline] + fn mark_dirty(&self, page_id: PageId) { + self.dirty_pages.lock().mark_dirty(page_id); + } + + /// Syncs dirty pages to the backing file. + /// This performs up to N+1 flushes, where N is the number of pages in the dirty set. pub fn sync(&self) -> io::Result<()> { if cfg!(not(miri)) { - self.mmap.flush() + let mut dirty_pages = self.dirty_pages.lock(); + for (offset, length) in dirty_pages.byte_runs() { + self.mmap.flush_range(offset, length)?; + } + dirty_pages.clear(); + Ok(()) } else { Ok(()) }