diff --git a/crates/vsock/src/vhu_vsock_thread.rs b/crates/vsock/src/vhu_vsock_thread.rs index 0c2ab2f3c..a3790468f 100644 --- a/crates/vsock/src/vhu_vsock_thread.rs +++ b/crates/vsock/src/vhu_vsock_thread.rs @@ -462,76 +462,81 @@ impl VhostUserVsockThread { let queue = vring_mut.get_queue_mut(); - while let Some(mut avail_desc) = queue - .iter(atomic_mem.memory()) - .map_err(|_| Error::IterateQueue)? - .next() - { - used_any = true; - let mem = atomic_mem.clone().memory(); - - let head_idx = avail_desc.head_index(); - let used_len = match VsockPacket::from_rx_virtq_chain( - mem.deref(), - &mut avail_desc, - self.tx_buffer_size, - ) { - Ok(mut pkt) => { - let recv_result = match rx_queue_type { - RxQueueType::Standard => self.thread_backend.recv_pkt(&mut pkt), - RxQueueType::RawPkts => self.thread_backend.recv_raw_pkt(&mut pkt), - }; - - if recv_result.is_ok() { - PKT_HEADER_SIZE + pkt.len() as usize - } else { - queue.iter(mem).unwrap().go_to_previous_position(); - break; + let mut iter_has_elemnt = true; + while iter_has_elemnt { + let queue_iter = queue + .iter(atomic_mem.memory()) + .map_err(|_| Error::IterateQueue)?; + + iter_has_elemnt = false; + for mut avail_desc in queue_iter { + used_any = true; + iter_has_elemnt = true; + let mem = atomic_mem.clone().memory(); + + let head_idx = avail_desc.head_index(); + let used_len = match VsockPacket::from_rx_virtq_chain( + mem.deref(), + &mut avail_desc, + self.tx_buffer_size, + ) { + Ok(mut pkt) => { + let recv_result = match rx_queue_type { + RxQueueType::Standard => self.thread_backend.recv_pkt(&mut pkt), + RxQueueType::RawPkts => self.thread_backend.recv_raw_pkt(&mut pkt), + }; + + if recv_result.is_ok() { + PKT_HEADER_SIZE + pkt.len() as usize + } else { + queue.iter(mem).unwrap().go_to_previous_position(); + break; + } } - } - Err(e) => { - warn!("vsock: RX queue error: {:?}", e); - 0 - } - }; + Err(e) => { + warn!("vsock: RX queue error: {:?}", e); + 0 + } + }; - let vring = vring.clone(); - let event_idx = self.event_idx; + let vring = vring.clone(); + let event_idx = self.event_idx; - self.pool.spawn_ok(async move { - // TODO: Understand why doing the following in the pool works - if event_idx { - if vring.add_used(head_idx, used_len as u32).is_err() { - warn!("Could not return used descriptors to ring"); - } - match vring.needs_notification() { - Err(_) => { - warn!("Could not check if queue needs to be notified"); - vring.signal_used_queue().unwrap(); + self.pool.spawn_ok(async move { + // TODO: Understand why doing the following in the pool works + if event_idx { + if vring.add_used(head_idx, used_len as u32).is_err() { + warn!("Could not return used descriptors to ring"); } - Ok(needs_notification) => { - if needs_notification { + match vring.needs_notification() { + Err(_) => { + warn!("Could not check if queue needs to be notified"); vring.signal_used_queue().unwrap(); } + Ok(needs_notification) => { + if needs_notification { + vring.signal_used_queue().unwrap(); + } + } } + } else { + if vring.add_used(head_idx, used_len as u32).is_err() { + warn!("Could not return used descriptors to ring"); + } + vring.signal_used_queue().unwrap(); } - } else { - if vring.add_used(head_idx, used_len as u32).is_err() { - warn!("Could not return used descriptors to ring"); - } - vring.signal_used_queue().unwrap(); - } - }); + }); - match rx_queue_type { - RxQueueType::Standard => { - if !self.thread_backend.pending_rx() { - break; + match rx_queue_type { + RxQueueType::Standard => { + if !self.thread_backend.pending_rx() { + break; + } } - } - RxQueueType::RawPkts => { - if !self.thread_backend.pending_raw_pkts() { - break; + RxQueueType::RawPkts => { + if !self.thread_backend.pending_raw_pkts() { + break; + } } } } @@ -616,68 +621,75 @@ impl VhostUserVsockThread { None => return Err(Error::NoMemoryConfigured), }; - while let Some(mut avail_desc) = vring - .get_mut() - .get_queue_mut() - .iter(atomic_mem.memory()) - .map_err(|_| Error::IterateQueue)? - .next() - { - used_any = true; - let mem = atomic_mem.clone().memory(); - - let head_idx = avail_desc.head_index(); - let pkt = match VsockPacket::from_tx_virtq_chain( - mem.deref(), - &mut avail_desc, - self.tx_buffer_size, - ) { - Ok(pkt) => pkt, - Err(e) => { - dbg!("vsock: error reading TX packet: {:?}", e); - continue; + let mut vring_mut = vring.get_mut(); + + let queue = vring_mut.get_queue_mut(); + + let mut iter_has_elemnt = true; + while iter_has_elemnt { + let queue_iter = queue + .iter(atomic_mem.memory()) + .map_err(|_| Error::IterateQueue)?; + + iter_has_elemnt = false; + for mut avail_desc in queue_iter { + iter_has_elemnt = true; + used_any = true; + let mem = atomic_mem.clone().memory(); + + let head_idx = avail_desc.head_index(); + let pkt = match VsockPacket::from_tx_virtq_chain( + mem.deref(), + &mut avail_desc, + self.tx_buffer_size, + ) { + Ok(pkt) => pkt, + Err(e) => { + dbg!("vsock: error reading TX packet: {:?}", e); + continue; + } + }; + + if self.thread_backend.send_pkt(&pkt).is_err() { + vring + .get_mut() + .get_queue_mut() + .iter(mem) + .unwrap() + .go_to_previous_position(); + break; } - }; - - if self.thread_backend.send_pkt(&pkt).is_err() { - vring - .get_mut() - .get_queue_mut() - .iter(mem) - .unwrap() - .go_to_previous_position(); - break; - } - // TODO: Check if the protocol requires read length to be correct - let used_len = 0; + // TODO: Check if the protocol requires read length to be correct + let used_len = 0; - let vring = vring.clone(); - let event_idx = self.event_idx; + let vring = vring.clone(); + let event_idx = self.event_idx; - self.pool.spawn_ok(async move { - if event_idx { - if vring.add_used(head_idx, used_len as u32).is_err() { - warn!("Could not return used descriptors to ring"); - } - match vring.needs_notification() { - Err(_) => { - warn!("Could not check if queue needs to be notified"); - vring.signal_used_queue().unwrap(); + self.pool.spawn_ok(async move { + if event_idx { + if vring.add_used(head_idx, used_len as u32).is_err() { + warn!("Could not return used descriptors to ring"); } - Ok(needs_notification) => { - if needs_notification { + match vring.needs_notification() { + Err(_) => { + warn!("Could not check if queue needs to be notified"); vring.signal_used_queue().unwrap(); } + Ok(needs_notification) => { + if needs_notification { + vring.signal_used_queue().unwrap(); + } + } } + } else { + if vring.add_used(head_idx, used_len as u32).is_err() { + warn!("Could not return used descriptors to ring"); + } + vring.signal_used_queue().unwrap(); } - } else { - if vring.add_used(head_idx, used_len as u32).is_err() { - warn!("Could not return used descriptors to ring"); - } - vring.signal_used_queue().unwrap(); - } - }); + }); + } } Ok(used_any)