diff --git a/benches/benchmark.rs b/benches/benchmark.rs index f23084e..1aca395 100644 --- a/benches/benchmark.rs +++ b/benches/benchmark.rs @@ -17,7 +17,9 @@ criterion_group!( bench_threaded_chan, bench_threaded_spsc, bench_threaded_reverse_chan, - bench_threaded_reverse_spsc + bench_threaded_reverse_spsc, + bench_pop_n, + ); criterion_main!(benches); @@ -167,3 +169,54 @@ fn bench_spsc_threaded2(b: &mut Bencher) { c.try_pop(); } } + +fn bench_pop_n(b: &mut Criterion) { + b.bench_function("pop_n_via_pop", |b| { + b.iter_with_setup( + || { + let (p, c) = bounded_spsc_queue::make(500); + for i in 0 .. 500 { + p.push(i) + } + c + }, + |c| { + for _ in 0 .. 500 { + c.pop(); + } + + }, + ) + }); + + b.bench_function("pop_n", |b| { + let mut buf = [0; 500]; + b.iter_with_setup( + || { + let (p, c) = bounded_spsc_queue::make(500); + for i in 0 .. 500 { + p.push(i) + } + c + }, + |c| { + c.pop_n(&mut buf) + }, + ) + }); + + b.bench_function("pop_n_overlapping", |b| { + let mut buf = [0; 500]; + let (p, c) = bounded_spsc_queue::make(500); + b.iter_with_setup( + || { + for i in 0 .. 500 { + p.push(i) + } + }, + move |_| { + c.pop_n(&mut buf) + }, + ) + }); +} diff --git a/src/lib.rs b/src/lib.rs index 9e6c7bd..1db8606 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -125,6 +125,61 @@ impl Buffer { diff } + /// Attempts to pop at most `target.len()` values off the buffer. + /// + /// Returns the amount of values successfully popped. + /// + /// # Examples + /// + /// ``` + /// let mut target = [0; 1024]; + /// let popped = buffer.pop_n(&mut target); + /// ``` + pub fn pop_n(&self, target: &mut [T]) -> usize { + let current_head = self.head.load(Ordering::Relaxed); + + self.shadow_tail.set(self.tail.load(Ordering::Acquire)); + if current_head == self.shadow_tail.get() { + return 0; + } + + let mut diff = self.shadow_tail.get().wrapping_sub(current_head); + if diff > target.len() { + diff = target.len() + } + + + let start = current_head & (self.allocated_size - 1); + let mut mid_point = self.allocated_size - start; + let rest; + if mid_point > diff { + mid_point = diff; + rest = 0; + } else { + rest = diff as isize - mid_point as isize; + }; + + unsafe { + ptr::copy_nonoverlapping( + self.buffer.offset(start as isize), + target.as_mut_ptr(), + mid_point, + ); + + if rest > 0 { + ptr::copy_nonoverlapping( + self.buffer, + target.as_mut_ptr().offset(mid_point as isize), + rest as usize, + ) + } + } + + self.head.store(current_head.wrapping_add(diff), Ordering::Release); + + diff + } + /// Pop a value off the buffer. /// /// This method will block until the buffer is non-empty. The waiting strategy is a simple @@ -513,6 +568,25 @@ impl Consumer { pub fn skip_n(&self, n: usize) -> usize { (*self.buffer).skip_n(n) } + + /// Attempts to pop at most `target.len()` values off the buffer. + /// + /// Returns the amount of values successfully popped. + /// + /// # Examples + /// + /// ``` + /// use bounded_spsc_queue::*; + /// + /// let (_, consumer) = make(100); + /// + /// let mut buffer = [0; 512]; + /// let popped = consumer.pop_n(&mut buffer); // try to pop at most 512 elements + /// ``` + pub fn pop_n(&self, target: &mut[T]) -> usize { + (*self.buffer).pop_n(target) + } + /// Returns the total capacity of this queue /// /// This value represents the total capacity of the queue when it is full. It does not @@ -686,6 +760,68 @@ mod tests { } } + #[test] + fn test_pop_n() { + { + let (p, c) = super::make(500); + for _ in 0 .. 500 { + for i in 0..500 { + p.push(i) + } + + let mut buf = vec![0; 500]; + + assert_eq!(c.pop_n(&mut buf[..]), 500); + assert_eq!(buf, (0..500).collect::>()); + } + } + + { + let (p, c) = super::make(8); + for i in 0..8 { + p.push(i) + } + + let mut buf = vec![0; 8]; + assert_eq!(c.pop_n(&mut buf[..]), 8); + assert_eq!(buf, (0..8).collect::>()); + } + + { + let (p, c) = super::make(500); + for i in 0..500 { + p.push(i) + } + + { + let mut buf = [0, 0, 0]; + + assert_eq!(c.pop_n(&mut buf), 3); + assert_eq!(c.size(), 497); + assert_eq!(buf, [0, 1, 2]); + + assert_eq!(c.pop_n(&mut buf), 3); + assert_eq!(c.size(), 494); + assert_eq!(buf, [3, 4, 5]); + + c.pop(); + c.pop(); + + assert_eq!(c.pop_n(&mut buf), 3); + assert_eq!(c.size(), 489); + assert_eq!(buf, [8, 9, 10]); + } + + { + let mut buf = [0; 1000]; + let expected = 489; + assert_eq!(c.pop_n(&mut buf), expected); + assert_eq!(c.size(), 0); + assert_eq!(&buf[..expected], &(11 .. 500).collect::>()[..]); + } + } + } + extern crate time; use self::time::PreciseTime; use std::sync::mpsc::sync_channel;