diff --git a/Cargo.lock b/Cargo.lock index 090ecc9..d57b7fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1106,6 +1106,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.30" @@ -2493,6 +2499,7 @@ dependencies = [ "rand", "regex", "reqwest", + "rstest", "sample-arrow2", "sample-std", "sample-test", @@ -2869,6 +2876,15 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "proc-macro-crate" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" +dependencies = [ + "toml_edit 0.21.1", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -3118,6 +3134,12 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "relative-path" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" + [[package]] name = "reqwest" version = "0.11.27" @@ -3222,6 +3244,36 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rstest" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9afd55a67069d6e434a95161415f5beeada95a01c7b815508a82dcb0e1593682" +dependencies = [ + "futures", + "futures-timer", + "rstest_macros", + "rustc_version", +] + +[[package]] +name = "rstest_macros" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4165dfae59a39dd41d8dec720d3cbfbc71f69744efb480a3920f5d4e0cc6798d" +dependencies = [ + "cfg-if", + "glob", + "proc-macro-crate", + "proc-macro2", + "quote", + "regex", + "relative-path", + "rustc_version", + "syn 2.0.55", + "unicode-ident", +] + [[package]] name = "rust-embed" version = "8.3.0" @@ -4405,6 +4457,17 @@ dependencies = [ "winnow 0.5.40", ] +[[package]] +name = "toml_edit" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" +dependencies = [ + "indexmap 2.2.6", + "toml_datetime", + "winnow 0.5.40", +] + [[package]] name = "toml_edit" version = "0.22.12" diff --git a/server/Cargo.toml b/server/Cargo.toml index cbe5504..3d0020e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -56,6 +56,7 @@ reqwest = { version = "0.11", default-features = false, features = [ "json", "rustls-tls", ] } +rstest = "0.21" test-log = { version = "0.2", default-features = false, features = ["trace"] } uuid = { version = "1", features = ["v4", "fast-rng"] } diff --git a/server/src/chunk.rs b/server/src/chunk.rs index 29fb4ea..6d0e197 100644 --- a/server/src/chunk.rs +++ b/server/src/chunk.rs @@ -348,6 +348,7 @@ pub mod test { use crate::arrow2::array::{ListArray, MutableListArray, StructArray, TryExtend}; use plateau_transport::estimate_size; + // TODO replace with `plateau_test::inference_schema_a` pub fn inferences_schema_a() -> SchemaChunk { let time = PrimitiveArray::::from_values(vec![0, 1, 2, 3, 4]); let inputs = PrimitiveArray::::from_values(vec![1.0, 2.0, 3.0, 4.0, 5.0]); diff --git a/server/src/http.rs b/server/src/http.rs index 0d99772..e89cb24 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -319,6 +319,7 @@ pub async fn topic_iterate( let page_size = RowLimit::records(query.page_size.unwrap_or(1000)).min(max_page); let position = position.unwrap_or_default(); let partition_filter = query.partition_filter; + let data_focus = query.data_focus.clone(); let order: Ordering = query.order.unwrap_or(TopicIterationOrder::Asc).into(); let mut result = if let Some(start) = query.start_time { @@ -327,11 +328,11 @@ pub async fn topic_iterate( Err(ErrorReply::InvalidQuery)? } topic - .get_records_by_time(position, times, page_size, partition_filter) + .get_records_by_time(position, times, page_size, partition_filter, data_focus) .await } else { topic - .get_records(position, page_size, order, partition_filter) + .get_records(position, page_size, order, partition_filter, data_focus) .await }; @@ -374,18 +375,19 @@ async fn partition_get_records( let topic = catalog.get_topic(&topic_name).await; let start_record = RecordIndex(query.start); let page_size = RowLimit::records(query.page_size.unwrap_or(1000)).min(max_page); + let data_focus = query.data_focus.clone(); let mut result = if let Some(start) = query.start_time { let times = parse_time_range(start, query.end_time)?; topic .get_partition(&partition_name) .await - .get_records_by_time(start_record, times, page_size) + .get_records_by_time(start_record, times, page_size, data_focus) .await } else { topic .get_partition(&partition_name) .await - .get_records(start_record, page_size, Ordering::Forward) + .get_records(start_record, page_size, Ordering::Forward, data_focus) .await }; diff --git a/server/src/partition.rs b/server/src/partition.rs index 8c416d1..4deadfd 100644 --- a/server/src/partition.rs +++ b/server/src/partition.rs @@ -43,6 +43,7 @@ use futures::FutureExt; use futures::{future, stream}; use metrics::{counter, gauge}; use plateau_transport::arrow2::compute::comparison::lt_scalar; +use plateau_transport::DataFocus; use plateau_transport::SchemaChunk; use serde::{Deserialize, Serialize}; use tokio::sync::{oneshot, watch, RwLock, RwLockReadGuard}; @@ -148,7 +149,7 @@ impl Partition { // rewind until we find a starting index based upon a good segment while index.is_none() { if let Some(ix) = current { - let segment = Slog::segment_from_name(root, slog_name, ix); + let segment = Slog::segment_from_name(root, slog_name, ix, None); let data = manifest.get_segment_data(ix.to_id(id)).await; let valid = segment.validate(); index = match data { @@ -240,6 +241,7 @@ impl Partition { start: RecordIndex, limit: RowLimit, order: Ordering, + focus: DataFocus, ) -> LimitedBatch { let state = self.state.read().await; let segments = self @@ -263,7 +265,7 @@ impl Partition { }; state - .get_records_from_segments(limit, filter, segments, order) + .get_records_from_segments(limit, filter, segments, order, focus) .await } @@ -272,6 +274,7 @@ impl Partition { start: RecordIndex, times: RangeInclusive>, limit: RowLimit, + focus: DataFocus, ) -> LimitedBatch { let state = self.state.read().await; let segments = self @@ -296,14 +299,19 @@ impl Partition { }; state - .get_records_from_segments(limit, filter, segments, Ordering::Forward) + .get_records_from_segments(limit, filter, segments, Ordering::Forward, focus) .await } #[cfg(test)] pub(crate) async fn get_record_by_index(&self, index: RecordIndex) -> Option { let record_response = self - .get_records(index, RowLimit::records(1), Ordering::Forward) + .get_records( + index, + RowLimit::records(1), + Ordering::Forward, + DataFocus::default(), + ) .await; record_response @@ -506,6 +514,7 @@ impl State { filter: impl Fn(&IndexedChunk) -> BooleanArray + Send + Sync, indices: impl StreamExt + Send + 'a, order: Ordering, + focus: DataFocus, ) -> LimitedBatch { // record queries can be thought of as filtering the entire record set // across all segments. doing so via simply reading everything would be @@ -530,7 +539,7 @@ impl State { // pagination, so compute those while we have the relevant // ranges easily accessible in `SegmentData`. self.messages - .iter_segment(segment.index, order) + .iter_segment(segment.index, order, focus.clone()) .into_stream() .flat_map(stream::iter) .scan( @@ -692,6 +701,7 @@ pub mod test { max_bytes: 100_000, }, Ordering::Forward, + DataFocus::default(), ) .await .chunks @@ -719,6 +729,7 @@ pub mod test { max_bytes: 100_000, }, Ordering::Forward, + DataFocus::default(), ) .await .chunks @@ -746,6 +757,7 @@ pub mod test { max_bytes: 100_000, }, Ordering::Reverse, + DataFocus::default(), ) .await .chunks @@ -773,6 +785,7 @@ pub mod test { max_bytes: 100_000, }, Ordering::Reverse, + DataFocus::default(), ) .await .chunks @@ -810,6 +823,7 @@ pub mod test { max_bytes: 100_000, }, Ordering::Reverse, + DataFocus::default(), ) .await; @@ -914,7 +928,8 @@ pub mod test { part.get_records( RecordIndex(start), RowLimit::records(limit), - Ordering::Forward + Ordering::Forward, + DataFocus::default(), ) .await .chunks @@ -949,6 +964,7 @@ pub mod test { RecordIndex(0), parse_time(2)..=parse_time(3), RowLimit::default(), + DataFocus::default(), ) .await; @@ -1048,7 +1064,12 @@ pub mod test { { let part = reattach(&dir, spec).await; let result = part - .get_records(RecordIndex(0), RowLimit::default(), Ordering::Forward) + .get_records( + RecordIndex(0), + RowLimit::default(), + Ordering::Forward, + DataFocus::default(), + ) .await; assert_eq!(SegmentChunk::from(result.chunks[0].clone()), chunk_a.chunk); @@ -1072,7 +1093,12 @@ pub mod test { let start = RecordIndex(0); let result = part - .get_records(start, RowLimit::default(), Ordering::Forward) + .get_records( + start, + RowLimit::default(), + Ordering::Forward, + DataFocus::default(), + ) .await; assert_eq!( format!("{:?}", SegmentChunk::from(result.chunks[0].clone())), @@ -1109,7 +1135,9 @@ pub mod test { let order = Ordering::Forward; let start = RecordIndex(0); - let result = part.get_records(start, RowLimit::default(), order).await; + let result = part + .get_records(start, RowLimit::default(), order, DataFocus::default()) + .await; assert_eq!(result.status, BatchStatus::SchemaChanged); assert_eq!( result.chunks, @@ -1121,7 +1149,9 @@ pub mod test { ); let start = result.chunks.last().unwrap().end().unwrap() + 1; - let result = part.get_records(start, RowLimit::default(), order).await; + let result = part + .get_records(start, RowLimit::default(), order, DataFocus::default()) + .await; assert_eq!(result.status, BatchStatus::SchemaChanged); assert_eq!( result.chunks, @@ -1139,7 +1169,9 @@ pub mod test { ); let start = result.chunks.last().unwrap().end().unwrap() + 1; - let result = part.get_records(start, RowLimit::default(), order).await; + let result = part + .get_records(start, RowLimit::default(), order, DataFocus::default()) + .await; assert!(result.status.is_open()); assert_eq!( result.chunks, @@ -1147,7 +1179,9 @@ pub mod test { ); let start = result.chunks.last().unwrap().end().unwrap() + 1; - let result = part.get_records(start, RowLimit::default(), order).await; + let result = part + .get_records(start, RowLimit::default(), order, DataFocus::default()) + .await; assert!(result.status.is_open()); assert_eq!(result.chunks, vec![]); } @@ -1194,7 +1228,12 @@ pub mod test { part.commit().await.unwrap(); let result = part - .get_records(RecordIndex(0), RowLimit::default(), Ordering::Forward) + .get_records( + RecordIndex(0), + RowLimit::default(), + Ordering::Forward, + DataFocus::default(), + ) .await; let msgs = result .chunks @@ -1227,7 +1266,12 @@ pub mod test { part.commit().await.unwrap(); let result = part - .get_records(RecordIndex(6), RowLimit::default(), Ordering::Reverse) + .get_records( + RecordIndex(6), + RowLimit::default(), + Ordering::Reverse, + DataFocus::default(), + ) .await; let msgs = result .chunks @@ -1274,7 +1318,12 @@ pub mod test { // iterate schema A let start = RecordIndex(part_len); let result = part - .get_records(start, RowLimit::default(), Ordering::Reverse) + .get_records( + start, + RowLimit::default(), + Ordering::Reverse, + DataFocus::default(), + ) .await; assert_eq!(result.status, BatchStatus::SchemaChanged); assert_eq!(result.schema.unwrap(), chunk_b.schema); @@ -1305,6 +1354,7 @@ pub mod test { result.chunks.last().unwrap().end().unwrap(), RowLimit::default(), Ordering::Reverse, + DataFocus::default(), ) .await; assert_eq!(result.status, BatchStatus::SchemaChanged); @@ -1336,6 +1386,7 @@ pub mod test { result.chunks.last().unwrap().end().unwrap(), RowLimit::default(), Ordering::Reverse, + DataFocus::default(), ) .await; assert!(result.status.is_open()); @@ -1391,7 +1442,7 @@ pub mod test { let root = PathBuf::from(dir.path()); // first, corrupt the last file. - let segment = Slog::segment_from_name(root.as_path(), &slog_name, last); + let segment = Slog::segment_from_name(root.as_path(), &slog_name, last, None); debug!("corrupting {last:?}"); let f = fs::File::options().write(true).open(segment.path())?; f.set_len(16)?; @@ -1402,7 +1453,7 @@ pub mod test { manifest.remove_segment(ix.to_id(&spec.1)).await; // delete the next file entirely - let segment = Slog::segment_from_name(root.as_path(), &slog_name, ix.prev().unwrap()); + let segment = Slog::segment_from_name(root.as_path(), &slog_name, ix.prev().unwrap(), None); debug!("deleting {:?}", ix.prev().unwrap()); segment.destroy()?; @@ -1500,7 +1551,12 @@ pub mod test { // fetch from start fast-forwards to first valid index let result = t - .get_records(RecordIndex(0), RowLimit::records(2), Ordering::Forward) + .get_records( + RecordIndex(0), + RowLimit::records(2), + Ordering::Forward, + DataFocus::default(), + ) .await; assert_eq!(result.status, BatchStatus::RecordsExceeded); let start_index = result.chunks.first().unwrap().start().unwrap(); @@ -1511,7 +1567,12 @@ pub mod test { // iterate through from start let result = t - .get_records(start_index, RowLimit::records(2), Ordering::Forward) + .get_records( + start_index, + RowLimit::records(2), + Ordering::Forward, + DataFocus::default(), + ) .await; assert_eq!(result.status, BatchStatus::RecordsExceeded); let next_index = result.chunks.iter().next_back().unwrap().end().unwrap() + 1; @@ -1521,7 +1582,12 @@ pub mod test { ); let result = t - .get_records(next_index, RowLimit::records(2), Ordering::Forward) + .get_records( + next_index, + RowLimit::records(2), + Ordering::Forward, + DataFocus::default(), + ) .await; assert_eq!(result.status, BatchStatus::RecordsExceeded); let next_index = result.chunks.iter().next_back().unwrap().end().unwrap() + 1; @@ -1531,14 +1597,24 @@ pub mod test { ); let result = t - .get_records(next_index, RowLimit::records(2), Ordering::Forward) + .get_records( + next_index, + RowLimit::records(2), + Ordering::Forward, + DataFocus::default(), + ) .await; assert_limit_unreached(&result.status); let next_index = result.chunks.iter().next_back().unwrap().end().unwrap() + 1; assert_eq!(deindex(result.chunks), vec![records[7].clone()]); let result = t - .get_records(next_index, RowLimit::records(2), Ordering::Forward) + .get_records( + next_index, + RowLimit::records(2), + Ordering::Forward, + DataFocus::default(), + ) .await; assert_limit_unreached(&result.status); assert_eq!(deindex(result.chunks), vec![]); @@ -1607,7 +1683,8 @@ pub mod test { t.get_records_by_time( RecordIndex(start), query, - RowLimit::records(limit) + RowLimit::records(limit), + DataFocus::default(), ) .await .chunks @@ -1641,6 +1718,7 @@ pub mod test { max_bytes: 250, }, Ordering::Forward, + DataFocus::default(), ) .await; @@ -1662,6 +1740,7 @@ pub mod test { max_bytes: 1, }, Ordering::Forward, + DataFocus::default(), ) .await; assert_eq!(result.status, BatchStatus::BytesExceeded); diff --git a/server/src/segment.rs b/server/src/segment.rs index 3d668fa..2547120 100644 --- a/server/src/segment.rs +++ b/server/src/segment.rs @@ -24,6 +24,7 @@ use serde::{Deserialize, Serialize}; use tracing::{error, trace, warn}; use crate::arrow2::datatypes::Schema; +use plateau_transport::DataFocus; use plateau_transport::SegmentChunk; #[cfg(test)] @@ -81,11 +82,14 @@ pub trait SegmentIterator: DoubleEndedIterator> { #[derive(Clone, Debug)] pub(crate) struct Segment { path: PathBuf, + focus: Option, } impl Segment { - pub(crate) fn at(path: PathBuf) -> Self { - Self { path } + pub(crate) fn at(path: impl AsRef, focus: impl Into>) -> Self { + let path = path.as_ref().to_path_buf(); + let focus = focus.into(); + Self { path, focus } } pub(crate) fn path(&self) -> &PathBuf { @@ -175,10 +179,10 @@ impl Segment { } pub(crate) fn iter(&self) -> Result { - let cache = cache::read(self.cache_path()).unwrap_or_else(|err| { - error!("error reading cache at {:?}: {err:?}", self.cache_path()); - None - }); + let focus = self.focus.clone().unwrap_or_default(); + let cache = cache::read(self.cache_path()) + .inspect_err(|err| error!("error reading cache at {:?}: {err:?}", self.cache_path())) + .unwrap_or_default(); if self.path.exists() { trace!( @@ -198,7 +202,7 @@ impl Segment { Ok(ReadFormat::Parquet(segment.read(cache)?)) } else if arrow { trace!("{:?} in arrow format", self.path); - let segment = arrow::Segment::new(self.path.clone())?; + let segment = arrow::Segment::new(self.path.clone())?.focus(focus); Ok(ReadFormat::Arrow(segment.read(cache)?)) } else { anyhow::bail!("unable to detect file format for segment {:?}", self.path) @@ -409,316 +413,4 @@ impl Writer { } #[cfg(test)] -pub mod test { - use std::borrow::Borrow; - - use super::*; - use crate::chunk::{iter_legacy, test::inferences_schema_a}; - use chrono::{TimeZone, Utc}; - use plateau_transport::SchemaChunk; - use sample_arrow2::{ - array::ArbitraryArray, - chunk::ArbitraryChunk, - datatypes::{sample_flat, ArbitraryDataType}, - }; - use sample_std::{Chance, Regex}; - use tempfile::tempdir; - use test::arrow::test::partial_write; - - impl Config { - pub fn nocommit() -> Self { - Self { - durable_checkpoints: false, - arrow: false, - } - } - - pub fn parquet() -> Self { - Self { - arrow: false, - ..Self::default() - } - } - - pub fn arrow() -> Self { - Self { - arrow: true, - ..Self::default() - } - } - } - - impl Writer { - pub fn log_arrow + Clone + PartialEq>( - &mut self, - data: SchemaChunk, - active: Option, - ) -> Result<()> { - self.log_arrows(data.schema.borrow(), vec![data.chunk], active) - } - - pub fn update_cache(&mut self, active: SegmentChunk) -> Result<()> { - self.cache.update(self.chunk_ix, &self.schema, active) - } - } - - pub fn build_records>(it: I) -> Vec { - it.map(|(ix, message)| Record { - time: Utc.timestamp_opt(ix, 0).unwrap(), - message: message.into_bytes(), - }) - .collect() - } - - pub fn collect_records( - schema: Schema, - iter: impl Iterator>, - ) -> Vec { - iter_legacy(schema, iter).flat_map(Result::unwrap).collect() - } - - // nulls=true breaks arrow2's parquet support, but is fine for feather - pub fn deep_chunk(depth: usize, len: usize, nulls: bool) -> ArbitraryChunk { - let names = Regex::new("[a-z]{4,8}"); - let data_type = ArbitraryDataType { - struct_branch: 1..3, - names: names.clone(), - nullable: if nulls { Chance(0.5) } else { Chance(0.0) }, - flat: sample_flat, - } - .sample_depth(depth); - - let array = ArbitraryArray { - names, - branch: 0..10, - len: len..(len + 1), - null: Chance(0.1), - // this appears to break arrow2's parquet support - // is_nullable: true, - is_nullable: false, - }; - - ArbitraryChunk { - chunk_len: 10..1000, - array_count: 1..2, - data_type, - array, - } - } - - #[test] - fn test_interrupted_cache_write() -> Result<()> { - let root = tempdir()?; - let path = root.path().join("partial-write.parquet"); - let s = Segment::at(path.clone()); - - let a = inferences_schema_a(); - let mut w = s.create(a.schema.clone(), Config::default())?; - w.log_arrow(a.clone(), Some(a.chunk.clone()))?; - drop(w); - - let f = fs::File::options().append(true).open(s.cache_path())?; - f.set_len(f.metadata()?.len() - 15)?; - - let mut r = s.iter()?; - assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk)); - assert_eq!(r.next().map(|v| v.ok()), None); - - Ok(()) - } - - #[test] - fn test_partial_cache_write() -> Result<()> { - let root = tempdir()?; - let path = root.path().join("partial-write.parquet"); - let s = Segment::at(path.clone()); - - let a = inferences_schema_a(); - let mut w = s.create(a.schema.clone(), Config::default())?; - w.log_arrow(a.clone(), Some(a.chunk.clone()))?; - - let more = crate::chunk::concatenate(&[a.chunk.clone(), a.chunk.clone()])?; - w.log_arrows(&a.schema, vec![], Some(more))?; - drop(w); - - let f = fs::File::options().append(true).open(s.cache_path())?; - f.set_len(f.metadata()?.len() - 15)?; - - let mut r = s.iter()?; - assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk.clone())); - assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk)); - assert_eq!(r.next().map(|v| v.ok()), None); - - Ok(()) - } - - #[test] - fn test_arrow_with_truncated_cache() -> Result<()> { - let root = tempdir()?; - let path = root.path().join("partial-write.arrow"); - let s = Segment::at(path.clone()); - - let a = inferences_schema_a(); - let mut w = s.create(a.schema.clone(), Config::arrow())?; - w.log_arrow(a.clone(), Some(a.chunk.clone()))?; - w.log_arrow(a.clone(), Some(a.chunk.clone()))?; - - let more = crate::chunk::concatenate(&[a.chunk.clone(), a.chunk.clone()])?; - w.log_arrows(&a.schema, vec![], Some(more))?; - drop(w); - - let f = fs::File::options().append(true).open(s.cache_path())?; - f.set_len(f.metadata()?.len() - 15)?; - - let mut r = s.iter()?; - // two chunks from file, one from cache (the other will have its frame - // interrupted by above corruption) - assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk.clone())); - assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk.clone())); - assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk.clone())); - assert_eq!(r.next().map(|v| v.ok()), None); - - Ok(()) - } - - #[test] - fn test_arrow_corruption_with_cache_write() -> Result<()> { - let root = tempdir()?; - let path = root.path().join("partial-write.arrow"); - let s = Segment::at(path.clone()); - - let a = inferences_schema_a(); - let mut w = s.create(a.schema.clone(), Config::arrow())?; - w.log_arrow(a.clone(), Some(a.chunk.clone()))?; - w.log_arrow(a.clone(), Some(a.chunk.clone()))?; - - let more = crate::chunk::concatenate(&[a.chunk.clone(), a.chunk.clone()])?; - w.log_arrows(&a.schema, vec![], Some(more))?; - drop(w); - - let f = fs::File::options().append(true).open(s.path())?; - f.set_len(f.metadata()?.len() - 15)?; - - let mut r = s.iter()?; - assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk.clone())); - // we need to discard the whole cache because of the gap created above - assert_eq!(r.next().map(|v| v.ok()), None); - - Ok(()) - } - - #[test] - fn test_dual_format() -> Result<()> { - let root = tempdir()?; - let parquet = Segment::at(root.path().join("test.parquet")); - let arrow = Segment::at(root.path().join("test.arrow")); - - let a = inferences_schema_a(); - - let mut w = parquet.create(a.schema.clone(), Config::default())?; - w.log_arrow(a.clone(), Some(a.chunk.clone()))?; - w.end()?; - - let mut w = arrow.create(a.schema.clone(), Config::arrow())?; - w.log_arrow(a.clone(), Some(a.chunk.clone()))?; - w.end()?; - - // verify we don't need to provide the format here, it's autodetected - let mut r = parquet.iter()?; - assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk.clone())); - assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk.clone())); - assert_eq!(r.next().map(|v| v.ok()), None); - - let mut r = arrow.iter()?; - assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk.clone())); - assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk)); - assert_eq!(r.next().map(|v| v.ok()), None); - - Ok(()) - } - - #[test] - fn test_parquet_cache_updates() -> Result<()> { - let root = tempdir()?; - - let a = inferences_schema_a(); - - let all_counts = [1, 3, 4, 2, 1]; - for ix in 1..all_counts.len() { - trace!("iter: {ix} counts: 1 + {:?}", &all_counts[0..ix]); - let mut chunk = a.chunk.clone(); - - let path = root.path().join(format!("{ix:?}.parquet")); - let s = Segment::at(path.clone()); - let mut w = s.create(a.schema.clone(), Config::parquet())?; - - for count in &all_counts[0..ix] { - let new_parts: Vec<_> = std::iter::once(chunk.clone()) - .chain(std::iter::repeat(a.chunk.clone()).take(*count)) - .collect(); - chunk = crate::chunk::concatenate(&new_parts)?; - w.update_cache(chunk.clone())?; - } - - drop(w); - - let mut r = s.iter()?; - assert_eq!(r.next().map(|v| v.unwrap()), Some(chunk)); - } - - Ok(()) - } - - #[test] - fn test_arrow_cache_updates() -> Result<()> { - let root = tempdir()?; - - let a = inferences_schema_a(); - - let all_counts = [1, 3, 4, 2, 1]; - for ix in 1..all_counts.len() { - trace!("iter: {ix} counts: 1 + {:?}", &all_counts[0..ix]); - let mut chunk = a.chunk.clone(); - - let path = root.path().join(format!("{ix:?}.arrow")); - let s = Segment::at(path.clone()); - let mut w = s.create(a.schema.clone(), Config::arrow())?; - - for count in &all_counts[0..ix] { - let new_parts: Vec<_> = std::iter::once(chunk.clone()) - .chain(std::iter::repeat(a.chunk.clone()).take(*count)) - .collect(); - chunk = crate::chunk::concatenate(&new_parts)?; - w.update_cache(chunk.clone())?; - } - - drop(w); - - let mut r = s.iter()?; - assert_eq!(r.next().map(|v| v.unwrap()), Some(chunk)); - } - - Ok(()) - } - - #[test] - fn test_partial_write_size_destroy() -> Result<()> { - let root = tempdir()?; - let a = inferences_schema_a(); - let arrow_segment = partial_write(root.path(), a.clone())?; - - let paths: Vec<_> = arrow_segment.clone().parts().collect(); - let segment = Segment::at(arrow_segment.into_path()); - - segment.iter()?.count(); - - assert!(segment.size_estimate()? > fs::metadata(&segment.path)?.len() as usize); - segment.destroy()?; - - for path in paths { - assert!(!path.exists()); - } - - Ok(()) - } -} +pub mod test; diff --git a/server/src/segment/arrow.rs b/server/src/segment/arrow.rs index 5af6740..af412e7 100644 --- a/server/src/segment/arrow.rs +++ b/server/src/segment/arrow.rs @@ -13,6 +13,7 @@ use std::io::{Read, Seek, SeekFrom}; use std::ops::Range; use std::path::{Path, PathBuf}; +use plateau_transport::DataFocus; use plateau_transport::SegmentChunk; use tracing::{error, trace, warn}; @@ -43,6 +44,7 @@ pub struct Segment { path: PathBuf, recovery_path: PathBuf, recovered_path: PathBuf, + focus: DataFocus, } impl Segment { @@ -52,15 +54,21 @@ impl Segment { anyhow::ensure!(ext != "recovered"); let recovery_path = path.with_extension("recovery"); - let recovered_path = path.with_extension("recovery"); + let recovered_path = path.with_extension("recovered"); + let focus = DataFocus::default(); Ok(Self { path, recovery_path, recovered_path, + focus, }) } + pub fn focus(self, focus: DataFocus) -> Self { + Self { focus, ..self } + } + fn directory(&self) -> anyhow::Result { let mut parent = self.path.clone(); parent.pop(); @@ -68,15 +76,15 @@ impl Segment { } pub fn read(&self, cache: Option) -> anyhow::Result { - if self.recovered_path.exists() { + let reader = if self.recovered_path.exists() { trace!(?self.recovered_path, "reading from"); Reader::open(&self.recovered_path) } else { - Reader::open(&self.path).or_else(|err| { - error!(%err, path = %self.path.display(), "error reading segment"); - self.recover(cache).map(|(_, reader)| reader) - }) - } + Reader::open(&self.path) + .inspect_err(|e| error!(%e, path = %self.path.display(), "error reading segment")) + .or_else(|_| self.recover(cache).map(|(_, reader)| reader)) + }?; + Ok(reader.focus(self.focus.clone())) } fn recover(&self, cache: Option) -> anyhow::Result<(usize, Reader)> { @@ -231,8 +239,9 @@ pub struct Reader { dictionaries: Dictionaries, message_scratch: Vec, data_scratch: Vec, - iter_range: Range, + focus: DataFocus, + projection: Option>, } impl Reader { @@ -243,24 +252,38 @@ impl Reader { let message_scratch = vec![]; let metadata = read_file_metadata(&mut file)?; let dictionaries = read_file_dictionaries(&mut file, &metadata, &mut data_scratch)?; + let iter_range = 0..metadata.blocks.len(); + let focus = DataFocus::default(); + let projection = None; Ok(Self { - iter_range: 0..metadata.blocks.len(), - metadata, dictionaries, file, message_scratch, data_scratch, + iter_range, + focus, + projection, }) } + pub fn focus(self, focus: DataFocus) -> Self { + let projection = focus.projection(self.schema()); + + Self { + focus, + projection, + ..self + } + } + fn read_ix(&mut self, ix: usize) -> anyhow::Result { read_batch( &mut self.file, &self.dictionaries, &self.metadata, - None, + self.projection.as_deref(), None, ix, &mut self.message_scratch, diff --git a/server/src/segment/parquet.rs b/server/src/segment/parquet.rs index 12e9dc8..5b3deef 100644 --- a/server/src/segment/parquet.rs +++ b/server/src/segment/parquet.rs @@ -464,7 +464,7 @@ mod test { fn can_iter_forward_double_ended() -> anyhow::Result<()> { let root = tempdir()?; let path = root.path().join("testing.parquet"); - let s = Segment::at(path.clone()); + let s = Segment::at(path.clone(), None); let records = build_records((0..10).map(|i| (i, format!("m{i}")))); let mut w = s.create(legacy_schema(), Config::parquet())?; @@ -487,7 +487,7 @@ mod test { fn can_iter_reverse_double_ended() -> anyhow::Result<()> { let root = tempdir()?; let path = root.path().join("testing.parquet"); - let s = Segment::at(path.clone()); + let s = Segment::at(path.clone(), None); let mut records = build_records((0..10).map(|i| (i, format!("m{i}")))); let mut w = s.create(legacy_schema(), Config::parquet())?; @@ -510,7 +510,7 @@ mod test { #[test] fn round_trip1_2() -> anyhow::Result<()> { let path = PathBuf::from("tests/data/v1.parquet"); - let s = Segment::at(path); + let s = Segment::at(path, None); let records = build_records((0..20).map(|ix| (ix, format!("message-{ix}")))); let r = s.iter()?; @@ -522,7 +522,7 @@ mod test { fn round_trip2() -> anyhow::Result<()> { let root = tempdir()?; let path = root.path().join("testing.parquet"); - let s = Segment::at(path); + let s = Segment::at(path, None); let records = build_records((0..20).map(|ix| (ix, format!("message-{ix}")))); let schema = legacy_schema(); @@ -547,7 +547,7 @@ mod test { fn schema_change() -> anyhow::Result<()> { let root = tempdir()?; let path = root.path().join("testing.parquet"); - let s = Segment::at(path); + let s = Segment::at(path, None); let a = inferences_schema_a(); let mut w = s.create(a.schema.clone(), Config::parquet())?; @@ -562,7 +562,7 @@ mod test { fn schema_file_metadata() -> anyhow::Result<()> { let root = tempdir()?; let path = root.path().join("testing.parquet"); - let s = Segment::at(path); + let s = Segment::at(path, None); let mut a = inferences_schema_a(); a.schema @@ -586,7 +586,7 @@ mod test { fn nested() -> anyhow::Result<()> { let root = tempdir()?; let path = root.path().join("testing.parquet"); - let s = Segment::at(path); + let s = Segment::at(path, None); let a = inferences_nested(); let mut w = s.create(a.schema.clone(), Config::parquet())?; @@ -598,7 +598,7 @@ mod test { fn large_records() -> anyhow::Result<()> { let root = tempdir()?; let path = root.path().join("testing.parquet"); - let s = Segment::at(path); + let s = Segment::at(path, None); let large: String = (0..100 * 1024).map(|_| "x").collect(); let records = build_records((0..20).map(|ix| (ix, format!("message-{ix}-{large}")))); @@ -624,7 +624,7 @@ mod test { fn test_open_drop_recovery() -> anyhow::Result<()> { let root = tempdir()?; let path = root.path().join("open-drop.parquet"); - let s = Segment::at(path.clone()); + let s = Segment::at(path.clone(), None); let a = inferences_schema_a(); let mut w = s.create(a.schema.clone(), Config::parquet())?; @@ -671,7 +671,7 @@ mod test { let chunk = chunk.value; let root = tempdir().unwrap(); let path = root.path().join("testing.parquet"); - let s = Segment::at(path); + let s = Segment::at(path, None); use sample_std::Sample; let mut name = Regex::new("[a-z]{4, 8}"); @@ -710,7 +710,7 @@ mod test { let chunks = chunk.value; let root = tempdir().unwrap(); let path = root.path().join("testing.parquet"); - let s = Segment::at(path); + let s = Segment::at(path, None); let mut name = Regex::new("[a-z]{4, 8}"); let mut g = Random::new(); diff --git a/server/src/segment/test.rs b/server/src/segment/test.rs new file mode 100644 index 0000000..0c0c58d --- /dev/null +++ b/server/src/segment/test.rs @@ -0,0 +1,472 @@ +use std::borrow::Borrow; + +use chrono::{TimeZone, Utc}; +use plateau_test::inferences_schema_a; +use plateau_transport::arrow2::datatypes::DataType; +use plateau_transport::SchemaChunk; +use sample_arrow2::{ + array::ArbitraryArray, + chunk::ArbitraryChunk, + datatypes::{sample_flat, ArbitraryDataType}, +}; +use sample_std::{Chance, Regex}; +use tempfile::{tempdir, TempDir}; +use test::arrow::test::partial_write; + +use super::*; + +use crate::chunk::iter_legacy; + +impl Config { + pub fn nocommit() -> Self { + Self { + durable_checkpoints: false, + arrow: false, + } + } + + pub fn parquet() -> Self { + Self { + arrow: false, + ..Self::default() + } + } + + pub fn arrow() -> Self { + Self { + arrow: true, + ..Self::default() + } + } +} + +impl Writer { + pub fn log_arrow + Clone + PartialEq>( + &mut self, + data: SchemaChunk, + active: Option, + ) -> Result<()> { + self.log_arrows(data.schema.borrow(), vec![data.chunk], active) + } + + pub fn update_cache(&mut self, active: SegmentChunk) -> Result<()> { + self.cache.update(self.chunk_ix, &self.schema, active) + } +} + +pub fn build_records>(it: I) -> Vec { + it.map(|(ix, message)| Record { + time: Utc.timestamp_opt(ix, 0).unwrap(), + message: message.into_bytes(), + }) + .collect() +} + +pub fn collect_records( + schema: Schema, + iter: impl Iterator>, +) -> Vec { + iter_legacy(schema, iter).flat_map(Result::unwrap).collect() +} + +// nulls=true breaks arrow2's parquet support, but is fine for feather +pub fn deep_chunk(depth: usize, len: usize, nulls: bool) -> ArbitraryChunk { + let names = Regex::new("[a-z]{4,8}"); + let data_type = ArbitraryDataType { + struct_branch: 1..3, + names: names.clone(), + nullable: if nulls { Chance(0.5) } else { Chance(0.0) }, + flat: sample_flat, + } + .sample_depth(depth); + + let array = ArbitraryArray { + names, + branch: 0..10, + len: len..(len + 1), + null: Chance(0.1), + // this appears to break arrow2's parquet support + // is_nullable: true, + is_nullable: false, + }; + + ArbitraryChunk { + chunk_len: 10..1000, + array_count: 1..2, + data_type, + array, + } +} + +#[test_log::test] +fn test_interrupted_cache_write() -> Result<()> { + let root = tempdir()?; + let path = root.path().join("partial-write.parquet"); + let s = Segment::at(path.clone(), None); + + let a = inferences_schema_a(); + let mut w = s.create(a.schema.clone(), Config::default())?; + w.log_arrow(a.clone(), Some(a.chunk.clone()))?; + drop(w); + + let f = fs::File::options().append(true).open(s.cache_path())?; + f.set_len(f.metadata()?.len() - 15)?; + + let mut r = s.iter()?; + assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk)); + assert_eq!(r.next().map(|v| v.ok()), None); + + Ok(()) +} + +#[test_log::test] +fn test_partial_cache_write() -> Result<()> { + let root = tempdir()?; + let path = root.path().join("partial-write.parquet"); + let s = Segment::at(path.clone(), None); + + let a = inferences_schema_a(); + let mut w = s.create(a.schema.clone(), Config::default())?; + w.log_arrow(a.clone(), Some(a.chunk.clone()))?; + + let more = crate::chunk::concatenate(&[a.chunk.clone(), a.chunk.clone()])?; + w.log_arrows(&a.schema, vec![], Some(more))?; + drop(w); + + let f = fs::File::options().append(true).open(s.cache_path())?; + f.set_len(f.metadata()?.len() - 15)?; + + let mut r = s.iter()?; + assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk.clone())); + assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk)); + assert_eq!(r.next().map(|v| v.ok()), None); + + Ok(()) +} + +#[test_log::test] +fn test_arrow_with_truncated_cache() -> Result<()> { + let root = tempdir()?; + let path = root.path().join("partial-write.arrow"); + let s = Segment::at(path.clone(), None); + + let a = inferences_schema_a(); + let mut w = s.create(a.schema.clone(), Config::arrow())?; + w.log_arrow(a.clone(), Some(a.chunk.clone()))?; + w.log_arrow(a.clone(), Some(a.chunk.clone()))?; + + let more = crate::chunk::concatenate(&[a.chunk.clone(), a.chunk.clone()])?; + w.log_arrows(&a.schema, vec![], Some(more))?; + drop(w); + + let f = fs::File::options().append(true).open(s.cache_path())?; + f.set_len(f.metadata()?.len() - 15)?; + + let mut r = s.iter()?; + // two chunks from file, one from cache (the other will have its frame + // interrupted by above corruption) + assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk.clone())); + assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk.clone())); + assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk.clone())); + assert_eq!(r.next().map(|v| v.ok()), None); + + Ok(()) +} + +#[test_log::test] +fn test_arrow_corruption_with_cache_write() -> Result<()> { + let root = tempdir()?; + let path = root.path().join("partial-write.arrow"); + let s = Segment::at(path.clone(), None); + + let a = inferences_schema_a(); + let mut w = s.create(a.schema.clone(), Config::arrow())?; + w.log_arrow(a.clone(), Some(a.chunk.clone()))?; + w.log_arrow(a.clone(), Some(a.chunk.clone()))?; + + let more = crate::chunk::concatenate(&[a.chunk.clone(), a.chunk.clone()])?; + w.log_arrows(&a.schema, vec![], Some(more))?; + drop(w); + + let f = fs::File::options().append(true).open(s.path())?; + f.set_len(f.metadata()?.len() - 15)?; + + let mut r = s.iter()?; + assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk.clone())); + // we need to discard the whole cache because of the gap created above + assert_eq!(r.next().map(|v| v.ok()), None); + + Ok(()) +} + +#[test_log::test] +fn test_dual_format() -> Result<()> { + let root = tempdir()?; + let parquet = Segment::at(root.path().join("test.parquet"), None); + let arrow = Segment::at(root.path().join("test.arrow"), None); + + let a = inferences_schema_a(); + + let mut w = parquet.create(a.schema.clone(), Config::default())?; + w.log_arrow(a.clone(), Some(a.chunk.clone()))?; + w.end()?; + + let mut w = arrow.create(a.schema.clone(), Config::arrow())?; + w.log_arrow(a.clone(), Some(a.chunk.clone()))?; + w.end()?; + + // verify we don't need to provide the format here, it's autodetected + let mut r = parquet.iter()?; + assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk.clone())); + assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk.clone())); + assert_eq!(r.next().map(|v| v.ok()), None); + + let mut r = arrow.iter()?; + assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk.clone())); + assert_eq!(r.next().map(|v| v.unwrap()), Some(a.chunk)); + assert_eq!(r.next().map(|v| v.ok()), None); + + Ok(()) +} + +#[test_log::test] +fn test_parquet_cache_updates() -> Result<()> { + let root = tempdir()?; + + let a = inferences_schema_a(); + + let all_counts = [1, 3, 4, 2, 1]; + for ix in 1..all_counts.len() { + trace!("iter: {ix} counts: 1 + {:?}", &all_counts[0..ix]); + let mut chunk = a.chunk.clone(); + + let path = root.path().join(format!("{ix:?}.parquet")); + let s = Segment::at(path.clone(), None); + let mut w = s.create(a.schema.clone(), Config::parquet())?; + + for count in &all_counts[0..ix] { + let new_parts: Vec<_> = std::iter::once(chunk.clone()) + .chain(std::iter::repeat(a.chunk.clone()).take(*count)) + .collect(); + chunk = crate::chunk::concatenate(&new_parts)?; + w.update_cache(chunk.clone())?; + } + + drop(w); + + let mut r = s.iter()?; + assert_eq!(r.next().map(|v| v.unwrap()), Some(chunk)); + } + + Ok(()) +} + +#[test_log::test] +fn test_arrow_cache_updates() -> Result<()> { + let root = tempdir()?; + + let a = inferences_schema_a(); + + let all_counts = [1, 3, 4, 2, 1]; + for ix in 1..all_counts.len() { + trace!("iter: {ix} counts: 1 + {:?}", &all_counts[0..ix]); + let mut chunk = a.chunk.clone(); + + let path = root.path().join(format!("{ix:?}.arrow")); + let s = Segment::at(path.clone(), None); + let mut w = s.create(a.schema.clone(), Config::arrow())?; + + for count in &all_counts[0..ix] { + let new_parts: Vec<_> = std::iter::once(chunk.clone()) + .chain(std::iter::repeat(a.chunk.clone()).take(*count)) + .collect(); + chunk = crate::chunk::concatenate(&new_parts)?; + w.update_cache(chunk.clone())?; + } + + drop(w); + + let mut r = s.iter()?; + assert_eq!(r.next().map(|v| v.unwrap()), Some(chunk)); + } + + Ok(()) +} + +#[test_log::test] +fn test_partial_write_size_destroy() -> Result<()> { + let root = tempdir()?; + let a = inferences_schema_a(); + let arrow_segment = partial_write(root.path(), a.clone())?; + + let paths: Vec<_> = arrow_segment.clone().parts().collect(); + let segment = Segment::at(arrow_segment.into_path(), None); + + segment.iter()?.count(); + + assert!(segment.size_estimate()? > fs::metadata(&segment.path)?.len() as usize); + segment.destroy()?; + + for path in paths { + assert!(!path.exists()); + } + + Ok(()) +} + +mod focus { + use super::*; + use rstest::rstest; + + fn prepare_segment( + segment: impl AsRef, + finalize_cache: bool, + ) -> Result<(TempDir, PathBuf)> { + let root = tempdir()?; + let segment = root.path().join(segment); + let arrow = Segment::at(root.path().join(&segment), None); + let a = inferences_schema_a(); + let mut w = arrow.create(a.schema.clone(), Config::arrow())?; + w.log_arrow(a.clone(), Some(a.chunk.clone()))?; + if finalize_cache { + w.end()?; + } + + Ok((root, segment)) + } + + #[test_log::test(rstest)] + #[case::without_cache(false)] + #[case::with_cache(true)] + fn compare_with_unfocus(#[case] cache: bool) -> Result<()> { + let (_root, path) = prepare_segment("focus.arrow", cache)?; + + // Make sure unfocused data has all the datasets + let mut full = Segment::at(&path, None).iter()?; + let chunk1 = full.next().unwrap().unwrap(); + let chunk2 = full.next().unwrap().unwrap(); + assert!(full.next().is_none()); + assert_eq!(chunk1.arrays().len(), 4); + assert_eq!(chunk2.arrays().len(), 4); + + // And now narrow down the reader to only one dataset + let focus = DataFocus::with_dataset("time"); + let mut focused = Segment::at(&path, focus).iter()?; + let chunk1 = focused.next().unwrap().unwrap(); + let chunk2 = focused.next().unwrap().unwrap(); + assert!(focused.next().is_none()); + assert_eq!(chunk1.arrays().len(), 1); + assert_eq!(chunk2.arrays().len(), 1); + + Ok(()) + } + + #[test_log::test(rstest)] + #[case::without_cache(false)] + #[case::with_cache(true)] + fn include_single(#[case] cache: bool) -> Result<()> { + let (_root, path) = prepare_segment("focus.arrow", cache)?; + + let focus = DataFocus::with_dataset("time"); + let mut focused = Segment::at(path, focus).iter()?; + let arrays1 = focused.next().unwrap().unwrap().into_arrays(); + let arrays2 = focused.next().unwrap().unwrap().into_arrays(); + assert!(focused.next().is_none()); + + assert_eq!(arrays1.len(), 1); + assert_eq!(arrays2.len(), 1); + + assert_eq!(arrays1[0].len(), 5); + assert_eq!(arrays2[0].len(), 5); + + assert_eq!(arrays1[0].data_type(), &DataType::Int64); + assert_eq!(arrays2[0].data_type(), &DataType::Int64); + + Ok(()) + } + + #[test_log::test(rstest)] + #[case::without_cache(false)] + #[case::with_cache(true)] + fn include_multiple(#[case] cache: bool) -> Result<()> { + let (_root, path) = prepare_segment("focus.arrow", cache)?; + + let focus = DataFocus::with_datasets(&["inputs", "outputs"]); + let mut focused = Segment::at(path, focus).iter()?; + let arrays1 = focused.next().unwrap().unwrap().into_arrays(); + let arrays2 = focused.next().unwrap().unwrap().into_arrays(); + assert!(focused.next().is_none()); + + assert_eq!(arrays1.len(), 2); + assert_eq!(arrays2.len(), 2); + + assert_eq!(arrays1[0].len(), 5); + assert_eq!(arrays2[0].len(), 5); + + assert_eq!(arrays1[0].data_type(), &DataType::Float32); + assert_eq!(arrays2[0].data_type(), &DataType::Float32); + + Ok(()) + } + + #[test_log::test(rstest)] + #[case::without_cache(false)] + #[case::with_cache(true)] + fn exclude_single(#[case] cache: bool) -> Result<()> { + let (_root, path) = prepare_segment("focus.arrow", cache)?; + let focus = DataFocus::without_dataset("time"); + let mut focused = Segment::at(path, focus).iter()?; + let arrays1 = focused.next().unwrap().unwrap().into_arrays(); + let arrays2 = focused.next().unwrap().unwrap().into_arrays(); + assert!(focused.next().is_none()); + + assert_eq!(arrays1.len(), 3); + assert_eq!(arrays2.len(), 3); + + assert_eq!(arrays1[0].len(), 5); + assert_eq!(arrays1[1].len(), 5); + assert_eq!(arrays1[2].len(), 5); + + assert_eq!(arrays2[0].len(), 5); + assert_eq!(arrays2[1].len(), 5); + assert_eq!(arrays2[2].len(), 5); + + assert!(matches!(arrays1[0].data_type(), DataType::List(_))); + assert!(matches!(arrays1[1].data_type(), DataType::Float32)); + assert!(matches!(arrays1[2].data_type(), DataType::Struct(_))); + + assert!(matches!(arrays2[0].data_type(), DataType::List(_))); + assert!(matches!(arrays2[1].data_type(), DataType::Float32)); + assert!(matches!(arrays2[2].data_type(), DataType::Struct(_))); + + Ok(()) + } + + #[test_log::test(rstest)] + #[case::without_cache(false)] + #[case::with_cache(true)] + fn exclude_multiple(#[case] cache: bool) -> Result<()> { + let (_root, path) = prepare_segment("focus.arrow", cache)?; + let focus = DataFocus::without_datasets(&["inputs", "outputs"]); + let mut focused = Segment::at(path, focus).iter()?; + let arrays1 = focused.next().unwrap().unwrap().into_arrays(); + let arrays2 = focused.next().unwrap().unwrap().into_arrays(); + assert!(focused.next().is_none()); + + assert_eq!(arrays1.len(), 2); + assert_eq!(arrays2.len(), 2); + + assert_eq!(arrays1[0].len(), 5); + assert_eq!(arrays1[1].len(), 5); + + assert_eq!(arrays2[0].len(), 5); + assert_eq!(arrays2[1].len(), 5); + + assert!(matches!(arrays1[0].data_type(), DataType::Int64)); + assert!(matches!(arrays1[1].data_type(), DataType::List(_))); + assert!(matches!(arrays2[0].data_type(), DataType::Int64)); + assert!(matches!(arrays2[1].data_type(), DataType::List(_))); + + Ok(()) + } +} diff --git a/server/src/slog.rs b/server/src/slog.rs index 2851e48..8537494 100644 --- a/server/src/slog.rs +++ b/server/src/slog.rs @@ -36,6 +36,7 @@ use bytesize::ByteSize; use chrono::{DateTime, Utc}; use metrics::counter; use plateau_client::estimate_size; +use plateau_transport::DataFocus; use plateau_transport::{SchemaChunk, SegmentChunk}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -371,22 +372,31 @@ impl Slog { } fn segment_path(root: &Path, name: &str, segment_ix: SegmentIndex) -> PathBuf { - let file = PathBuf::from(format!("{}-{}", name, segment_ix.0)); - [root, file.as_path()].into_iter().collect() + root.join(format!("{}-{}", name, segment_ix.0)) } - pub(crate) fn segment_from_name(root: &Path, name: &str, segment_ix: SegmentIndex) -> Segment { - Segment::at(Self::segment_path(root, name, segment_ix)) + pub(crate) fn segment_from_name( + root: &Path, + name: &str, + segment_ix: SegmentIndex, + focus: impl Into>, + ) -> Segment { + Segment::at(Self::segment_path(root, name, segment_ix), focus) } - pub(crate) fn get_segment(&self, segment_ix: SegmentIndex) -> Segment { - Self::segment_from_name(&self.root, &self.name, segment_ix) + pub(crate) fn get_segment( + &self, + segment_ix: SegmentIndex, + focus: impl Into>, + ) -> Segment { + Self::segment_from_name(&self.root, &self.name, segment_ix, focus) } pub(crate) async fn iter_segment( &self, ix: SegmentIndex, order: Ordering, + focus: DataFocus, ) -> Box> + Send> { let state = self.state.read().await; if ix > state.active_checkpoint.segment { @@ -414,7 +424,7 @@ impl Slog { })); } - let segment = self.get_segment(ix); + let segment = self.get_segment(ix, focus); if !Path::new(segment.path()).exists() { return Box::new(std::iter::empty()); } @@ -441,7 +451,7 @@ impl Slog { } pub(crate) fn destroy(&self, segment_ix: SegmentIndex) -> anyhow::Result<()> { - self.get_segment(segment_ix).destroy() + self.get_segment(segment_ix, None).destroy() } pub(crate) async fn cached_segment_data(&self) -> Vec { @@ -701,7 +711,7 @@ fn spawn_slog_thread( active_chunk, })) => { trace!("{}: received request for {:?}", name, records); - let new_segment = Slog::segment_from_name(&root, &name, segment); + let new_segment = Slog::segment_from_name(&root, &name, segment, None); current = current.and_then(|(schema, writer, id)| { if id != segment { trace!("{}: segment change {:?} {:?}", name, id, segment); @@ -797,7 +807,8 @@ mod test { impl Slog { async fn get_record(&self, ix: SegmentIndex, relative: usize) -> Option { - self.iter_segment(ix, Ordering::Forward) + let focus = DataFocus::default(); + self.iter_segment(ix, Ordering::Forward, focus) .await .flat_map(|chunk| LegacyRecords::try_from(chunk).unwrap().0) .nth(relative) diff --git a/server/src/topic.rs b/server/src/topic.rs index 2825b3f..7791589 100644 --- a/server/src/topic.rs +++ b/server/src/topic.rs @@ -18,7 +18,9 @@ use chrono::{DateTime, Utc}; use futures::future::{join_all, FutureExt}; use futures::stream; use futures::stream::StreamExt; -use plateau_transport::{PartitionFilter, PartitionSelector, SchemaChunk, TopicIterator}; +use plateau_transport::{ + DataFocus, PartitionFilter, PartitionSelector, SchemaChunk, TopicIterator, +}; use tokio::sync::{RwLock, RwLockReadGuard}; use tracing::debug; @@ -236,13 +238,17 @@ impl Topic { limit: RowLimit, order: Ordering, partition_filter: PartitionFilter, + focus: DataFocus, ) -> TopicRecordResponse { + let focus = &focus; self.get_records_from_all( starts, limit, order, |partition, start, partition_limit, order| async move { - partition.get_records(start, partition_limit, order).await + partition + .get_records(start, partition_limit, order, focus.clone()) + .await }, partition_filter, ) @@ -255,15 +261,17 @@ impl Topic { times: RangeInclusive>, limit: RowLimit, partition_filter: PartitionFilter, + focus: DataFocus, ) -> TopicRecordResponse { let times = × + let focus = &focus; self.get_records_from_all( starts, limit, Ordering::Forward, |partition, start, partition_limit, _order| async move { partition - .get_records_by_time(start, times.clone(), partition_limit) + .get_records_by_time(start, times.clone(), partition_limit, focus.clone()) .await }, partition_filter, @@ -547,7 +555,13 @@ mod test { // fetching two records will spill from partition-0, which has only one // record in the given time range, to partition-1 let result = topic - .get_records_by_time(HashMap::new(), span.clone(), RowLimit::records(2), None) + .get_records_by_time( + HashMap::new(), + span.clone(), + RowLimit::records(2), + None, + DataFocus::default(), + ) .await; assert_eq!(result.batch.status, BatchStatus::RecordsExceeded); assert_eq!(result.iter, expected_it); @@ -565,7 +579,13 @@ mod test { // next fetch will use partition with lowest index in iterator, partition-2 let prior_it = result.iter; let result = topic - .get_records_by_time(prior_it, span.clone(), RowLimit::records(1), None) + .get_records_by_time( + prior_it, + span.clone(), + RowLimit::records(1), + None, + DataFocus::default(), + ) .await; let expected_it: TopicIterator = names.clone().into_iter().zip([2, 1, 1]).collect(); assert_eq!(result.batch.status, BatchStatus::RecordsExceeded); @@ -584,7 +604,13 @@ mod test { // final fetch will fetch both remaining records from partition-1 and partition-2 let prior_it = result.iter; let result = topic - .get_records_by_time(prior_it, span.clone(), RowLimit::records(5), None) + .get_records_by_time( + prior_it, + span.clone(), + RowLimit::records(5), + None, + DataFocus::default(), + ) .await; let expected_it: TopicIterator = names.clone().into_iter().zip([2, 2, 2]).collect(); assert_limit_unreached(&result.batch.status); @@ -603,7 +629,13 @@ mod test { // no more records left let prior_it = result.iter; let result = topic - .get_records_by_time(prior_it, span, RowLimit::records(1), None) + .get_records_by_time( + prior_it, + span, + RowLimit::records(1), + None, + DataFocus::default(), + ) .await; assert_limit_unreached(&result.batch.status); assert_eq!(result.iter, expected_it); @@ -641,7 +673,13 @@ mod test { // first, we zip through all schema-a chunks at the start let result = topic - .get_records(HashMap::new(), many_rows, Ordering::Forward, None) + .get_records( + HashMap::new(), + many_rows, + Ordering::Forward, + None, + DataFocus::default(), + ) .await; assert_eq!(result.iter, to_iter(&names, [15, 5, 20])); assert_eq!(result.batch.status, BatchStatus::SchemaChanged); @@ -650,7 +688,13 @@ mod test { // we set a row limit here so we can test final iteration over all // partitions with schema-b later. let result = topic - .get_records(result.iter, RowLimit::records(5), Ordering::Forward, None) + .get_records( + result.iter, + RowLimit::records(5), + Ordering::Forward, + None, + DataFocus::default(), + ) .await; assert_eq!(result.iter, to_iter(&names, [15, 10, 20])); assert_eq!(result.batch.status, BatchStatus::RecordsExceeded); @@ -658,7 +702,13 @@ mod test { // that same partition is still the min partition, but has // briefly changed back to schema-a let result = topic - .get_records(result.iter, many_rows, Ordering::Forward, None) + .get_records( + result.iter, + many_rows, + Ordering::Forward, + None, + DataFocus::default(), + ) .await; assert_eq!(result.iter, to_iter(&names, [15, 15, 20])); assert_eq!(result.batch.status, BatchStatus::SchemaChanged); @@ -666,7 +716,13 @@ mod test { // now it has changed back to schema-b, and we can resume iterating // through all partitions. let result = topic - .get_records(result.iter, many_rows, Ordering::Forward, None) + .get_records( + result.iter, + many_rows, + Ordering::Forward, + None, + DataFocus::default(), + ) .await; assert_eq!(result.iter, to_iter(&names, [25, 25, 25])); assert!(result.batch.status.is_open()); @@ -699,7 +755,13 @@ mod test { // first, we zip through all schema-b chunks at the start let result = topic - .get_records(HashMap::new(), many_rows, Ordering::Reverse, None) + .get_records( + HashMap::new(), + many_rows, + Ordering::Reverse, + None, + DataFocus::default(), + ) .await; assert_eq!(result.iter["p1"], 15); assert_eq!(result.iter["p2"], 15); @@ -709,7 +771,13 @@ mod test { // now we'll get the bulk of the schema-a chunks let result = topic - .get_records(result.iter, RowLimit::default(), Ordering::Reverse, None) + .get_records( + result.iter, + RowLimit::default(), + Ordering::Reverse, + None, + DataFocus::default(), + ) .await; assert_eq!(result.iter["p1"], 0); assert_eq!(result.iter["p2"], 10); @@ -719,7 +787,13 @@ mod test { // the final schema-b chunk let result = topic - .get_records(result.iter, RowLimit::default(), Ordering::Reverse, None) + .get_records( + result.iter, + RowLimit::default(), + Ordering::Reverse, + None, + DataFocus::default(), + ) .await; assert_eq!(result.iter["p1"], 0); assert_eq!(result.iter["p2"], 5); @@ -729,7 +803,13 @@ mod test { // and the final schema-a chunk let result = topic - .get_records(result.iter, RowLimit::default(), Ordering::Reverse, None) + .get_records( + result.iter, + RowLimit::default(), + Ordering::Reverse, + None, + DataFocus::default(), + ) .await; assert_eq!(result.iter["p1"], 0); assert_eq!(result.iter["p2"], 0); @@ -778,6 +858,7 @@ mod test { RowLimit::records(fetch_count), Ordering::Forward, Some(filter.clone()), + DataFocus::default(), ) .await; @@ -840,6 +921,7 @@ mod test { RowLimit::records(fetch_count), Ordering::Forward, Some(filter.clone()), + DataFocus::default(), ) .await; @@ -897,6 +979,7 @@ mod test { RowLimit::records(fetch_count), Ordering::Forward, Some(filter.clone()), + DataFocus::default(), ) .await; @@ -951,6 +1034,7 @@ mod test { RowLimit::records(fetch_count), Ordering::Forward, Some(vec![PartitionSelector::from("regex:partition-.*")]), + DataFocus::default(), ) .await; @@ -1000,7 +1084,13 @@ mod test { assert!(records.len() % fetch_count != 0); for _ in 0..records.len() { let result = topic - .get_records(it, RowLimit::records(fetch_count), Ordering::Forward, None) + .get_records( + it, + RowLimit::records(fetch_count), + Ordering::Forward, + None, + DataFocus::default(), + ) .await; if fetched.len() + fetch_count >= records.len() { @@ -1041,7 +1131,13 @@ mod test { assert!(records.len() % fetch_count != 0); for _ in 0..records.len() { let result = topic - .get_records(it, RowLimit::records(fetch_count), Ordering::Reverse, None) + .get_records( + it, + RowLimit::records(fetch_count), + Ordering::Reverse, + None, + DataFocus::default(), + ) .await; if fetched.len() + fetch_count >= records.len() { @@ -1103,6 +1199,7 @@ mod test { RowLimit::records(5), Ordering::Forward, None, + DataFocus::default(), ) .await; assert_eq!( @@ -1117,6 +1214,7 @@ mod test { RowLimit::records(5), Ordering::Reverse, None, + DataFocus::default(), ) .await; assert_eq!( @@ -1149,7 +1247,13 @@ mod test { // verify second tranche let result = topic - .get_records(iter, RowLimit::records(5), Ordering::Reverse, None) + .get_records( + iter, + RowLimit::records(5), + Ordering::Reverse, + None, + DataFocus::default(), + ) .await; assert_eq!( vec!["r14", "r13", "r12", "r11", "r10"], @@ -1193,7 +1297,13 @@ mod test { // verify tranche spanning partitions let result = topic - .get_records(iter, RowLimit::records(7), Ordering::Reverse, None) + .get_records( + iter, + RowLimit::records(7), + Ordering::Reverse, + None, + DataFocus::default(), + ) .await; assert_eq!( vec!["r9", "r8", "r7", "r6", "r5", "r4", "r3"], @@ -1214,6 +1324,7 @@ mod test { RowLimit::records(1000), Ordering::Reverse, None, + DataFocus::default(), ) .await; assert_eq!(ITER_PARTITIONS + 1, result.iter.len()); @@ -1235,7 +1346,13 @@ mod test { // verify final tranche let result = topic - .get_records(iter, RowLimit::records(10), Ordering::Reverse, None) + .get_records( + iter, + RowLimit::records(10), + Ordering::Reverse, + None, + DataFocus::default(), + ) .await; assert_eq!(vec!["r2", "r1", "r0"], batch_to_vec(result.batch)); @@ -1280,7 +1397,12 @@ mod test { topic .get_partition("partition-0") .await - .get_records(RecordIndex(0), RowLimit::records(1000), Ordering::Forward) + .get_records( + RecordIndex(0), + RowLimit::records(1000), + Ordering::Forward, + DataFocus::default() + ) .await .chunks ), diff --git a/test/src/lib.rs b/test/src/lib.rs index 653f147..1a4bc5c 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -157,3 +157,7 @@ pub fn inferences_schema_a() -> SchemaChunk { .unwrap(), } } + +pub fn default() -> T { + T::default() +} diff --git a/transport/src/lib.rs b/transport/src/lib.rs index d030fcc..10fc12a 100644 --- a/transport/src/lib.rs +++ b/transport/src/lib.rs @@ -9,11 +9,12 @@ use std::{ use arrow2::bitmap::Bitmap; use arrow2::compute::aggregate::estimated_bytes_size; +use arrow2::datatypes::Field; +use arrow2::datatypes::Schema; use arrow2::{ array::{Array, StructArray}, chunk::Chunk, compute::concatenate::concatenate, - datatypes::Field, io::ipc::{read, write}, }; use regex::Regex; @@ -216,6 +217,38 @@ pub struct DataFocus { } impl DataFocus { + pub fn with_dataset(dataset: impl ToString) -> Self { + let dataset = vec![dataset.to_string()]; + Self { + dataset, + ..Self::default() + } + } + + pub fn with_datasets(datasets: &[impl ToString]) -> Self { + let dataset = datasets.iter().map(ToString::to_string).collect(); + Self { + dataset, + ..Self::default() + } + } + + pub fn without_dataset(dataset: impl ToString) -> Self { + let exclude = vec![dataset.to_string()]; + Self { + exclude, + ..Self::default() + } + } + + pub fn without_datasets(datasets: &[impl ToString]) -> Self { + let exclude = datasets.iter().map(ToString::to_string).collect(); + Self { + exclude, + ..Self::default() + } + } + pub fn is_some(&self) -> bool { !self.dataset.is_empty() || !self.exclude.is_empty() @@ -233,6 +266,40 @@ impl DataFocus { *arr = arr.with_validity(Some(all_null)); } } + + pub fn is_everything(&self) -> bool { + self.dataset.first().is_some_and(|ds| ds == "*") + } + + pub fn include(&self) -> HashSet<&String> { + self.dataset.iter().collect() + } + + pub fn exclude(&self) -> HashSet<&String> { + self.exclude.iter().collect() + } + + pub fn projection(&self, schema: &ArrowSchema) -> Option> { + if self.is_some() { + if self.is_everything() { + None + } else { + let include = self.include(); + let exclude = self.exclude(); + let projection = schema + .fields + .iter() + .enumerate() + .filter(|(_, field)| !exclude.contains(&field.name)) + .filter(|(_, field)| include.is_empty() || include.contains(&field.name)) + .map(|(idx, _)| idx) + .collect(); + Some(projection) + } + } else { + None + } + } } #[derive(Debug, Default, Deserialize, Serialize, IntoParams)] @@ -636,7 +703,7 @@ impl SchemaChunk { } }); - let exclude: HashSet<&String> = focus.exclude.iter().collect(); + let exclude = focus.exclude(); for path in paths { let split = focus.dataset_separator.as_ref().map_or_else( @@ -752,7 +819,7 @@ impl SchemaChunk { } } -fn contains_null_type(schema: &arrow2::datatypes::Schema) -> bool { +fn contains_null_type(schema: &Schema) -> bool { schema .fields .iter()