diff --git a/src/rust/catnap/win/socket.rs b/src/rust/catnap/win/socket.rs index 6f259d965..d8a664f80 100644 --- a/src/rust/catnap/win/socket.rs +++ b/src/rust/catnap/win/socket.rs @@ -101,6 +101,19 @@ const SOCKADDR_BUF_SIZE: usize = std::mem::size_of::() + 16; // AcceptEx buffer returns two addresses. const ACCEPT_BUFFER_LEN: usize = (SOCKADDR_BUF_SIZE) * 2; +//====================================================================================================================== +// Enums +//====================================================================================================================== + +/// Helper structure for converting between DemiBuffers and WSABUF slices. +enum WsaBufs { + /// Only one buffer present. + One(WSABUF), + + /// More than one buffer in a chain. Buffers are sequential for vectored I/O. + Many(Vec), +} + //====================================================================================================================== // Structures //====================================================================================================================== @@ -129,6 +142,40 @@ pub struct PopState { // Associated Functions //====================================================================================================================== +impl WsaBufs { + /// Convert a single DemiBuffer into a WSABUF. + fn convert_buf(buf: &mut DemiBuffer) -> WSABUF { + WSABUF { + len: buf.len() as u32, + buf: PSTR::from_raw(buf.as_mut_ptr()), + } + } + + /// Map one or more DemiBuffers in a chain into a WsaBufs enum. + pub fn new(buf: &mut DemiBuffer) -> WsaBufs { + if buf.is_multi_segment() { + let result: Vec = std::iter::once(Self::convert_buf(buf)) + .chain( + std::iter::successors(buf.next(), DemiBuffer::next) + .map(|mut buf: DemiBuffer| Self::convert_buf(&mut buf)), + ) + .collect::>(); + + WsaBufs::Many(result) + } else { + WsaBufs::One(Self::convert_buf(buf)) + } + } + + /// Translate the WsaBufs enum into a slice of WSABUFs usable by the windows API. + pub fn as_slice(&self) -> &[WSABUF] { + match self { + WsaBufs::One(buf) => std::slice::from_ref(buf), + WsaBufs::Many(vec) => vec.as_slice(), + } + } +} + impl AcceptState { /// Create a new, empty `AcceptState``. pub fn new() -> Self { @@ -453,17 +500,14 @@ impl Socket { let mut bytes_transferred: u32 = 0; let mut flags: u32 = 0; let success: bool = unsafe { - let wsa_buffer: WSABUF = WSABUF { - len: pop_state.buffer.len() as u32, - // Safety: loading the buffer pointer won't violate pinning invariants. - buf: PSTR::from_raw(pop_state.as_mut().get_unchecked_mut().buffer.as_mut_ptr()), - }; + // Safety: loading the buffer pointer won't violate pinning invariants. + let wsa_bufs: WsaBufs = WsaBufs::new(&mut pop_state.as_mut().get_unchecked_mut().buffer); // NB winsock service providers are required to capture the entire WSABUF array inline with the call, so // wsa_buffer and the derivative slice can safely drop after the call. let result: i32 = WSARecvFrom( self.s, - std::slice::from_ref(&wsa_buffer), + wsa_bufs.as_slice(), Some(&mut bytes_transferred), &mut flags, Some(pop_state.as_mut().get_unchecked_mut().address.as_mut_ptr() as *mut SOCKADDR), @@ -521,11 +565,8 @@ impl Socket { ) -> Result<(), Fail> { let mut bytes_transferred: u32 = 0; let success: bool = unsafe { - let wsa_buffer: WSABUF = WSABUF { - len: buffer.len() as u32, - // Safety: loading the buffer pointer won't violate pinning invariants. - buf: PSTR::from_raw(buffer.get_unchecked_mut().as_mut_ptr()), - }; + // Safety: loading the buffer pointer won't violate pinning invariants. + let wsa_bufs: WsaBufs = WsaBufs::new(buffer.get_unchecked_mut()); let addr: Option = addr.map(socket2::SockAddr::from); @@ -535,7 +576,7 @@ impl Socket { // functions equivalently to WSASend. let result: i32 = WSASendTo( self.s, - std::slice::from_ref(&wsa_buffer), + wsa_bufs.as_slice(), Some(&mut bytes_transferred), 0, addr.as_ref() diff --git a/src/rust/runtime/memory/demibuffer.rs b/src/rust/runtime/memory/demibuffer.rs index 483c42bfd..9d55751cd 100644 --- a/src/rust/runtime/memory/demibuffer.rs +++ b/src/rust/runtime/memory/demibuffer.rs @@ -179,7 +179,7 @@ impl MetaData { while md.next.is_some() { // Safety: The call to as_mut is safe, as the pointer is aligned and dereferenceable, and the MetaData // struct it points to is initialized properly. - md = unsafe { md.next.unwrap().as_mut() }; + md = unsafe { md.next.as_mut().unwrap().as_mut() }; } &mut *md } @@ -251,7 +251,7 @@ impl DemiBuffer { // Implementation Note: // This function is replacing the new() function of DataBuffer, which could return failure. However, the only - // failure it actually reported was if the new DataBuffer request was for zero size. A seperate empty() function + // failure it actually reported was if the new DataBuffer request was for zero size. A separate empty() function // was provided to allocate zero-size buffers. This new implementation does not have a special case for this, // instead, zero is a valid argument to new(). So we no longer need the failure return case of this function. // @@ -292,7 +292,7 @@ impl DemiBuffer { } // Embed the buffer type into the lower bits of the pointer. - let tagged: NonNull = temp.with_addr(temp.addr() | Tag::Heap); + let tagged: NonNull = tag_ptr(temp, Tag::Heap); // Return the new DemiBuffer. DemiBuffer { @@ -324,7 +324,7 @@ impl DemiBuffer { pub unsafe fn from_mbuf(mbuf_ptr: *mut rte_mbuf) -> Self { // Convert the raw pointer into a NonNull and add a tag indicating it is a DPDK buffer (i.e. a MBuf). let temp: NonNull = NonNull::new_unchecked(mbuf_ptr as *mut _); - let tagged: NonNull = temp.with_addr(temp.addr() | Tag::Dpdk); + let tagged: NonNull = tag_ptr(temp, Tag::Dpdk); DemiBuffer { tagged_ptr: tagged, @@ -353,6 +353,41 @@ impl DemiBuffer { self.as_metadata().data_len as usize } + /// Get the next buffer in the chain, if any. + pub fn next(&self) -> Option { + match self.get_tag() { + Tag::Heap => match self.as_metadata().next { + Some(metadata) => { + let next: DemiBuffer = Self { + tagged_ptr: tag_ptr(metadata, self.get_tag()), + _phantom: PhantomData, + }; + next.as_metadata().inc_refcnt(); + Some(next) + }, + + None => None, + }, + + #[cfg(feature = "libdpdk")] + Tag::Dpdk => { + let mbuf: *mut rte_mbuf = self.as_mbuf(); + let next_mbuf: *mut rte_mbuf = unsafe { + // Safety: The `mbuf` dereference below is safe, as it is aligned and dereferenceable. + (*mbuf).next + }; + + if next_mbuf == std::ptr::null_mut() { + None + } else { + let result: DemiBuffer = DemiBuffer::from_mbuf(next_mbuf); + result.as_metadata().inc_refcnt(); + Some(result) + } + }, + } + } + /// Removes `nbytes` bytes from the beginning of the `DemiBuffer` chain. // Note: If `nbytes` is greater than the length of the first segment in the chain, then this function will fail and // return an error, rather than remove the remaining bytes from subsequent segments in the chain. This is to match @@ -630,7 +665,7 @@ impl DemiBuffer { /// /// If the target [DemiBuffer] has multiple segments, `true` is returned. Otherwise, `false` is returned instead. /// - fn is_multi_segment(&self) -> bool { + pub fn is_multi_segment(&self) -> bool { match self.get_tag() { Tag::Heap => { let md_front: &MetaData = self.as_metadata(); @@ -650,6 +685,11 @@ impl DemiBuffer { // Helper Functions // ---------------- +// Add a tag to a MetaData address. +fn tag_ptr(metadata: NonNull, tag: Tag) -> NonNull { + metadata.with_addr(metadata.addr() | tag) +} + // Allocates the MetaData (plus the space for any directly attached data) for a new heap-allocated DemiBuffer. fn allocate_metadata_data(direct_data_size: u16) -> NonNull { // We need space for the MetaData struct, plus any extra memory for directly attached data. @@ -793,7 +833,7 @@ impl Clone for DemiBuffer { } // Embed the buffer type into the lower bits of the pointer. - let tagged: NonNull = head.with_addr(head.addr() | Tag::Heap); + let tagged: NonNull = tag_ptr(head, Tag::Heap); // Return the new DemiBuffer. DemiBuffer { @@ -983,7 +1023,7 @@ impl TryFrom<&[u8]> for DemiBuffer { } // Embed the buffer type into the lower bits of the pointer. - let tagged: NonNull = temp.with_addr(temp.addr() | Tag::Heap); + let tagged: NonNull = tag_ptr(temp, Tag::Heap); // Return the new DemiBuffer. Ok(DemiBuffer {