Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vsock: Move iter outside of while loop in process_rx_queue #411

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 125 additions & 113 deletions crates/vsock/src/vhu_vsock_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,76 +462,81 @@ impl VhostUserVsockThread {

let queue = vring_mut.get_queue_mut();

while let Some(mut avail_desc) = queue
.iter(atomic_mem.memory())
.map_err(|_| Error::IterateQueue)?
.next()
{
used_any = true;
let mem = atomic_mem.clone().memory();

let head_idx = avail_desc.head_index();
let used_len = match VsockPacket::from_rx_virtq_chain(
mem.deref(),
&mut avail_desc,
self.tx_buffer_size,
) {
Ok(mut pkt) => {
let recv_result = match rx_queue_type {
RxQueueType::Standard => self.thread_backend.recv_pkt(&mut pkt),
RxQueueType::RawPkts => self.thread_backend.recv_raw_pkt(&mut pkt),
};

if recv_result.is_ok() {
PKT_HEADER_SIZE + pkt.len() as usize
} else {
queue.iter(mem).unwrap().go_to_previous_position();
break;
let mut iter_has_elemnt = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iter_has_element is less cryptic IMHO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for replying. I'll fix these.

while iter_has_elemnt {
let queue_iter = queue
.iter(atomic_mem.memory())
.map_err(|_| Error::IterateQueue)?;

iter_has_elemnt = false;
for mut avail_desc in queue_iter {
used_any = true;
iter_has_elemnt = true;
let mem = atomic_mem.clone().memory();

let head_idx = avail_desc.head_index();
let used_len = match VsockPacket::from_rx_virtq_chain(
mem.deref(),
&mut avail_desc,
self.tx_buffer_size,
) {
Ok(mut pkt) => {
let recv_result = match rx_queue_type {
RxQueueType::Standard => self.thread_backend.recv_pkt(&mut pkt),
RxQueueType::RawPkts => self.thread_backend.recv_raw_pkt(&mut pkt),
};

if recv_result.is_ok() {
PKT_HEADER_SIZE + pkt.len() as usize
} else {
queue.iter(mem).unwrap().go_to_previous_position();
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we have 2 loops nested, IIUC this should exit both loops, so we may need to replace it with a return Ok(used_any), or set iter_has_elemnt to false.

If you prefer the last one, maybe then better to change the name of that variable to something like processing_rx.

}
}
}
Err(e) => {
warn!("vsock: RX queue error: {:?}", e);
0
}
};
Err(e) => {
warn!("vsock: RX queue error: {:?}", e);
0
}
};

let vring = vring.clone();
let event_idx = self.event_idx;
let vring = vring.clone();
let event_idx = self.event_idx;

self.pool.spawn_ok(async move {
// TODO: Understand why doing the following in the pool works
if event_idx {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
match vring.needs_notification() {
Err(_) => {
warn!("Could not check if queue needs to be notified");
vring.signal_used_queue().unwrap();
self.pool.spawn_ok(async move {
// TODO: Understand why doing the following in the pool works
if event_idx {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
Ok(needs_notification) => {
if needs_notification {
match vring.needs_notification() {
Err(_) => {
warn!("Could not check if queue needs to be notified");
vring.signal_used_queue().unwrap();
}
Ok(needs_notification) => {
if needs_notification {
vring.signal_used_queue().unwrap();
}
}
}
} else {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
vring.signal_used_queue().unwrap();
}
} else {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
vring.signal_used_queue().unwrap();
}
});
});

match rx_queue_type {
RxQueueType::Standard => {
if !self.thread_backend.pending_rx() {
break;
match rx_queue_type {
RxQueueType::Standard => {
if !self.thread_backend.pending_rx() {
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, I think the original idea was to stop processing new descriptors, since we don't have any packet to copy in it.

}
}
}
RxQueueType::RawPkts => {
if !self.thread_backend.pending_raw_pkts() {
break;
RxQueueType::RawPkts => {
if !self.thread_backend.pending_raw_pkts() {
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

}
}
}
}
Expand Down Expand Up @@ -616,68 +621,75 @@ impl VhostUserVsockThread {
None => return Err(Error::NoMemoryConfigured),
};

while let Some(mut avail_desc) = vring
.get_mut()
.get_queue_mut()
.iter(atomic_mem.memory())
.map_err(|_| Error::IterateQueue)?
.next()
{
used_any = true;
let mem = atomic_mem.clone().memory();

let head_idx = avail_desc.head_index();
let pkt = match VsockPacket::from_tx_virtq_chain(
mem.deref(),
&mut avail_desc,
self.tx_buffer_size,
) {
Ok(pkt) => pkt,
Err(e) => {
dbg!("vsock: error reading TX packet: {:?}", e);
continue;
let mut vring_mut = vring.get_mut();

let queue = vring_mut.get_queue_mut();

let mut iter_has_elemnt = true;
while iter_has_elemnt {
let queue_iter = queue
.iter(atomic_mem.memory())
.map_err(|_| Error::IterateQueue)?;

iter_has_elemnt = false;
for mut avail_desc in queue_iter {
iter_has_elemnt = true;
used_any = true;
let mem = atomic_mem.clone().memory();

let head_idx = avail_desc.head_index();
let pkt = match VsockPacket::from_tx_virtq_chain(
mem.deref(),
&mut avail_desc,
self.tx_buffer_size,
) {
Ok(pkt) => pkt,
Err(e) => {
dbg!("vsock: error reading TX packet: {:?}", e);
continue;
}
};

if self.thread_backend.send_pkt(&pkt).is_err() {
vring
.get_mut()
.get_queue_mut()
.iter(mem)
.unwrap()
.go_to_previous_position();
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And maybe also here.

}
};

if self.thread_backend.send_pkt(&pkt).is_err() {
vring
.get_mut()
.get_queue_mut()
.iter(mem)
.unwrap()
.go_to_previous_position();
break;
}

// TODO: Check if the protocol requires read length to be correct
let used_len = 0;
// TODO: Check if the protocol requires read length to be correct
let used_len = 0;

let vring = vring.clone();
let event_idx = self.event_idx;
let vring = vring.clone();
let event_idx = self.event_idx;

self.pool.spawn_ok(async move {
if event_idx {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
match vring.needs_notification() {
Err(_) => {
warn!("Could not check if queue needs to be notified");
vring.signal_used_queue().unwrap();
self.pool.spawn_ok(async move {
if event_idx {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
Ok(needs_notification) => {
if needs_notification {
match vring.needs_notification() {
Err(_) => {
warn!("Could not check if queue needs to be notified");
vring.signal_used_queue().unwrap();
}
Ok(needs_notification) => {
if needs_notification {
vring.signal_used_queue().unwrap();
}
}
}
} else {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
vring.signal_used_queue().unwrap();
}
} else {
if vring.add_used(head_idx, used_len as u32).is_err() {
warn!("Could not return used descriptors to ring");
}
vring.signal_used_queue().unwrap();
}
});
});
}
}

Ok(used_any)
Expand Down