diff --git a/src/rust/catpowder/mod.rs b/src/rust/catpowder/mod.rs index cac3e9ac5..009df7b15 100644 --- a/src/rust/catpowder/mod.rs +++ b/src/rust/catpowder/mod.rs @@ -5,10 +5,16 @@ mod win; #[cfg(target_os = "windows")] -pub use win::runtime::SharedCatpowderRuntime; +pub use win::{ + runtime::SharedCatpowderRuntime, + transport::SharedCatpowderTransport, +}; #[cfg(target_os = "linux")] mod linux; #[cfg(target_os = "linux")] pub use linux::LinuxRuntime as SharedCatpowderRuntime; + +#[cfg(target_os = "linux")] +pub use crate::inetstack::SharedInetStack as SharedCatpowderTransport; diff --git a/src/rust/catpowder/win/mod.rs b/src/rust/catpowder/win/mod.rs index 908f36376..d152d3ff5 100644 --- a/src/rust/catpowder/win/mod.rs +++ b/src/rust/catpowder/win/mod.rs @@ -14,3 +14,4 @@ mod socket; //====================================================================================================================== pub mod runtime; +pub mod transport; diff --git a/src/rust/catpowder/win/ring/rule/program.rs b/src/rust/catpowder/win/ring/rule/program.rs index 0c9d76ee7..13403409c 100644 --- a/src/rust/catpowder/win/ring/rule/program.rs +++ b/src/rust/catpowder/win/ring/rule/program.rs @@ -40,6 +40,15 @@ impl XdpProgram { let rule_count: u32 = rules.len() as u32; let mut handle: HANDLE = HANDLE::default(); + trace!( + "new(): rules={:?}, ifindex={}, hookid={:?}, queueid={}, flags={}", + rules, + ifindex, + hookid, + queueid, + flags + ); + // Attempt to create the XDP program. if let Some(create_program) = api.get().XdpCreateProgram { let result: HRESULT = unsafe { diff --git a/src/rust/catpowder/win/ring/rule/rule.rs b/src/rust/catpowder/win/ring/rule/rule.rs index 4fdf8ab89..3ace570de 100644 --- a/src/rust/catpowder/win/ring/rule/rule.rs +++ b/src/rust/catpowder/win/ring/rule/rule.rs @@ -7,9 +7,13 @@ use crate::{ catpowder::win::{ring::rule::params::XdpRedirectParams, socket::XdpSocket}, + inetstack::protocols::Protocol, runtime::libxdp, }; -use ::std::mem; +use ::std::{ + fmt::{self, Formatter}, + mem, +}; //====================================================================================================================== // Structures @@ -24,7 +28,8 @@ pub struct XdpRule(libxdp::XDP_RULE); //====================================================================================================================== impl XdpRule { - /// Creates a new XDP rule for the target socket. + /// Creates a new XDP rule for the target socket which filters all traffic. + #[allow(dead_code)] pub fn new(socket: &XdpSocket) -> Self { let redirect: XdpRedirectParams = XdpRedirectParams::new(socket); let rule: libxdp::XDP_RULE = unsafe { @@ -39,4 +44,61 @@ impl XdpRule { }; Self(rule) } + + /// Creates a new XDP rule for the target socket which filters for a specific (protocol, port) combination. + pub fn new_for_dest(socket: &XdpSocket, protocol: Protocol, port: u16) -> Self { + let redirect: XdpRedirectParams = XdpRedirectParams::new(socket); + let rule: libxdp::XDP_RULE = unsafe { + let mut rule: libxdp::XDP_RULE = std::mem::zeroed(); + rule.Match = match protocol { + Protocol::Udp => libxdp::_XDP_MATCH_TYPE_XDP_MATCH_UDP_DST, + Protocol::Tcp => libxdp::_XDP_MATCH_TYPE_XDP_MATCH_TCP_DST, + }; + rule.Action = libxdp::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_REDIRECT; + *rule.Pattern.Port.as_mut() = port; + // Perform bitwise copy from redirect to rule, as this is a union field. + *rule.__bindgen_anon_1.Redirect.as_mut() = mem::transmute_copy(redirect.as_ref()); + + rule + }; + Self(rule) + } +} + +//====================================================================================================================== +// Trait Implementations +//====================================================================================================================== + +impl fmt::Debug for XdpRule { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let match_str: &str = match self.0.Match { + libxdp::_XDP_MATCH_TYPE_XDP_MATCH_UDP_DST => "XDP_MATCH_UDP_DST", + libxdp::_XDP_MATCH_TYPE_XDP_MATCH_TCP_DST => "XDP_MATCH_TCP_DST", + libxdp::_XDP_MATCH_TYPE_XDP_MATCH_ALL => "XDP_MATCH_ALL", + _ => "UNKNOWN", + }; + + let pattern: String = match self.0.Match { + libxdp::_XDP_MATCH_TYPE_XDP_MATCH_UDP_DST | libxdp::_XDP_MATCH_TYPE_XDP_MATCH_TCP_DST => { + format!("{{Port={}}}", unsafe { self.0.Pattern.Port.as_ref() }) + }, + libxdp::_XDP_MATCH_TYPE_XDP_MATCH_ALL => format!("{{ALL}}"), + _ => format!("{{UNKNOWN}}"), + }; + + let action_str: &str = match self.0.Action { + libxdp::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_REDIRECT => "XDP_PROGRAM_ACTION_REDIRECT", + libxdp::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_EBPF => "XDP_PROGRAM_ACTION_EBPF", + libxdp::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_L2FWD => "XDP_PROGRAM_ACTION_L2FWD", + libxdp::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_PASS => "XDP_PROGRAM_ACTION_PASS", + libxdp::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_DROP => "XDP_PROGRAM_ACTION_DROP", + _ => "UNKNOWN", + }; + + f.debug_struct("XDP_RULE") + .field("Match", &match_str) + .field("Pattern", &pattern) + .field("Action", &action_str) + .finish() + } } diff --git a/src/rust/catpowder/win/ring/rx_ring.rs b/src/rust/catpowder/win/ring/rx_ring.rs index f279aef32..3b55546e7 100644 --- a/src/rust/catpowder/win/ring/rx_ring.rs +++ b/src/rust/catpowder/win/ring/rx_ring.rs @@ -16,6 +16,7 @@ use crate::{ }, socket::XdpSocket, }, + inetstack::protocols::Protocol, runtime::{fail::Fail, libxdp, limits}, }; use ::std::{cell::RefCell, rc::Rc}; @@ -26,6 +27,10 @@ use ::std::{cell::RefCell, rc::Rc}; /// A ring for receiving packets. pub struct RxRing { + /// Index of the interface for the ring. + ifindex: u32, + /// Index of the queue for the ring. + queueid: u32, /// A user memory region where receive buffers are stored. mem: Rc>, /// A ring for receiving packets. @@ -33,9 +38,9 @@ pub struct RxRing { /// A ring for returning receive buffers to the kernel. rx_fill_ring: XdpRing, /// Underlying XDP socket. - _socket: XdpSocket, // NOTE: we keep this here to prevent the socket from being dropped. + socket: XdpSocket, // NOTE: we keep this here to prevent the socket from being dropped. /// Underlying XDP program. - _program: XdpProgram, // NOTE: we keep this here to prevent the program from being dropped. + _program: Option, // NOTE: we keep this here to prevent the program from being dropped. } //====================================================================================================================== @@ -50,7 +55,11 @@ impl RxRing { let mut socket: XdpSocket = XdpSocket::create(api)?; // Create a UMEM region. - trace!("creating umem region"); + trace!( + "creating umem region; count={}, chunk_size={}", + length, + limits::RECVBUF_SIZE_MAX as u32 + ); let mem: Rc> = Rc::new(RefCell::new(UmemReg::new(length, limits::RECVBUF_SIZE_MAX as u32))); // Register the UMEM region. @@ -58,30 +67,30 @@ impl RxRing { socket.setsockopt( api, libxdp::XSK_SOCKOPT_UMEM_REG, - mem.borrow().as_ref() as *const libxdp::XSK_UMEM_REG as *const core::ffi::c_void, + mem.borrow().as_ref(), std::mem::size_of::() as u32, )?; // Set rx ring size. - trace!("setting rx ring size"); + trace!("setting rx ring size: {}", length); socket.setsockopt( api, libxdp::XSK_SOCKOPT_RX_RING_SIZE, - &length as *const u32 as *const core::ffi::c_void, + &length, std::mem::size_of::() as u32, )?; // Set rx fill ring size. - trace!("setting rx fill ring size"); + trace!("setting rx fill ring size: {}", length); socket.setsockopt( api, libxdp::XSK_SOCKOPT_RX_FILL_RING_SIZE, - &length as *const u32 as *const core::ffi::c_void, + &length, std::mem::size_of::() as u32, )?; // Bind the rx queue. - trace!("binding rx queue"); + trace!("binding rx queue for interface {}, queue {}", ifindex, queueid); socket.bind(api, ifindex, queueid, libxdp::_XSK_BIND_FLAGS_XSK_BIND_FLAG_RX)?; // Activate socket to enable packet reception. @@ -92,12 +101,9 @@ impl RxRing { trace!("retrieving rx ring info"); let mut ring_info: libxdp::XSK_RING_INFO_SET = unsafe { std::mem::zeroed() }; let mut option_length: u32 = std::mem::size_of::() as u32; - socket.getsockopt( - api, - libxdp::XSK_SOCKOPT_RING_INFO, - &mut ring_info as *mut libxdp::XSK_RING_INFO_SET as *mut core::ffi::c_void, - &mut option_length as *mut u32, - )?; + socket.getsockopt(api, libxdp::XSK_SOCKOPT_RING_INFO, &mut ring_info, &mut option_length)?; + + trace!("rx ring info: {:?}", ring_info); // Initialize rx and rx fill rings. let mut rx_fill_ring: XdpRing = XdpRing::new(&ring_info.Fill); @@ -111,25 +117,45 @@ impl RxRing { unsafe { *b = 0 }; rx_fill_ring.producer_submit(length); + Ok(Self { + ifindex, + queueid, + mem, + rx_ring, + rx_fill_ring, + socket, + _program: None, + }) + } + + /// Update the RxRing to use the specified rules for filtering. + pub fn reprogram(&mut self, api: &mut XdpApi, rules: &[(Protocol, u16)]) -> Result<(), Fail> { // Create XDP program. - trace!("creating xdp program"); + trace!( + "creating xdp program for interface {}, queue {} with the rules: {:?}", + self.ifindex, + self.queueid, + rules, + ); const XDP_INSPECT_RX: libxdp::XDP_HOOK_ID = libxdp::XDP_HOOK_ID { Layer: libxdp::_XDP_HOOK_LAYER_XDP_HOOK_L2, Direction: libxdp::_XDP_HOOK_DATAPATH_DIRECTION_XDP_HOOK_RX, SubLayer: libxdp::_XDP_HOOK_SUBLAYER_XDP_HOOK_INSPECT, }; - let rules: Vec = vec![XdpRule::new(&socket)]; - let program: XdpProgram = XdpProgram::new(api, &rules, ifindex, &XDP_INSPECT_RX, queueid, 0)?; - - trace!("xdp program created"); - - Ok(Self { - mem, - rx_ring, - rx_fill_ring, - _socket: socket, - _program: program, - }) + let mut xdp_rules: Vec = Vec::with_capacity(rules.len()); + for (protocol, port) in rules.iter() { + xdp_rules.push(XdpRule::new_for_dest(&self.socket, *protocol, *port)); + } + + let program: XdpProgram = XdpProgram::new(api, &xdp_rules, self.ifindex, &XDP_INSPECT_RX, self.queueid, 0)?; + trace!( + "xdp program created for interface {}, queue {}", + self.ifindex, + self.queueid + ); + + self._program = Some(program); + Ok(()) } /// Reserves a consumer slot in the rx ring. diff --git a/src/rust/catpowder/win/ring/tx_ring.rs b/src/rust/catpowder/win/ring/tx_ring.rs index 6312dc0f3..4088183c7 100644 --- a/src/rust/catpowder/win/ring/tx_ring.rs +++ b/src/rust/catpowder/win/ring/tx_ring.rs @@ -47,7 +47,7 @@ impl TxRing { socket.setsockopt( api, libxdp::XSK_SOCKOPT_UMEM_REG, - mem.borrow().as_ref() as *const libxdp::XSK_UMEM_REG as *const core::ffi::c_void, + mem.borrow().as_ref(), std::mem::size_of::() as u32, )?; @@ -56,7 +56,7 @@ impl TxRing { socket.setsockopt( api, libxdp::XSK_SOCKOPT_TX_RING_SIZE, - &length as *const u32 as *const core::ffi::c_void, + &length, std::mem::size_of::() as u32, )?; @@ -65,7 +65,7 @@ impl TxRing { socket.setsockopt( api, libxdp::XSK_SOCKOPT_TX_COMPLETION_RING_SIZE, - &length as *const u32 as *const core::ffi::c_void, + &length, std::mem::size_of::() as u32, )?; @@ -81,12 +81,7 @@ impl TxRing { trace!("retrieving tx ring info"); let mut ring_info: libxdp::XSK_RING_INFO_SET = unsafe { std::mem::zeroed() }; let mut option_length: u32 = std::mem::size_of::() as u32; - socket.getsockopt( - api, - libxdp::XSK_SOCKOPT_RING_INFO, - &mut ring_info as *mut libxdp::XSK_RING_INFO_SET as *mut core::ffi::c_void, - &mut option_length as *mut u32, - )?; + socket.getsockopt(api, libxdp::XSK_SOCKOPT_RING_INFO, &mut ring_info, &mut option_length)?; // Initialize tx and tx completion rings. let tx_ring: XdpRing = XdpRing::new(&ring_info.Tx); diff --git a/src/rust/catpowder/win/ring/umemreg.rs b/src/rust/catpowder/win/ring/umemreg.rs index 7f796f77b..4539bcd82 100644 --- a/src/rust/catpowder/win/ring/umemreg.rs +++ b/src/rust/catpowder/win/ring/umemreg.rs @@ -43,3 +43,13 @@ impl UmemReg { self.umem.Address } } + +//====================================================================================================================== +// Trait Implementations +//====================================================================================================================== + +impl std::fmt::Debug for UmemReg { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.umem.fmt(f) + } +} diff --git a/src/rust/catpowder/win/runtime.rs b/src/rust/catpowder/win/runtime.rs index e758dbc0b..0b9fb7e09 100644 --- a/src/rust/catpowder/win/runtime.rs +++ b/src/rust/catpowder/win/runtime.rs @@ -12,7 +12,7 @@ use crate::{ }, demi_sgarray_t, demi_sgaseg_t, demikernel::config::Config, - inetstack::protocols::{layer1::PhysicalLayer, MAX_HEADER_SIZE}, + inetstack::protocols::{layer1::PhysicalLayer, layer4::Socket, Protocol, MAX_HEADER_SIZE}, runtime::{ fail::Fail, libxdp, @@ -24,6 +24,7 @@ use crate::{ use ::arrayvec::ArrayVec; use ::libc::c_void; use ::std::{borrow::BorrowMut, mem}; +use std::{borrow::Borrow, net::SocketAddr}; use windows::Win32::{ Foundation::ERROR_INSUFFICIENT_BUFFER, System::SystemInformation::{ @@ -44,6 +45,7 @@ struct CatpowderRuntimeInner { api: XdpApi, tx: TxRing, rx_rings: Vec, + rules: Vec<(Protocol, u16)>, } //====================================================================================================================== // Implementations @@ -74,7 +76,90 @@ impl SharedCatpowderRuntime { } trace!("Created {} RX rings.", rx_rings.len()); - Ok(Self(SharedObject::new(CatpowderRuntimeInner { api, tx, rx_rings }))) + Ok(Self(SharedObject::new(CatpowderRuntimeInner { + api, + tx, + rx_rings, + rules: vec![], + }))) + } + + /// Reprograms the runtime with the current rule set. If new_rules is None, the current ruleset + /// is used. If roll_back is true, the ruleset for each updated queue is rolled back to the + /// previous rule set if any error occurs during reprogramming. stop_idx is the index of the + /// last queue to reprogram, useful when rolling back partially updated rule sets. + fn reprogram( + &mut self, + new_rules: Option<&Vec<(Protocol, u16)>>, + roll_back: bool, + stop_idx: usize, + ) -> Result<(), Fail> { + let inner_self: &mut CatpowderRuntimeInner = self.0.borrow_mut(); + let mut err: Option = None; + trace!( + "reprogram(): new_rules: {:?}; roll_back: {}, stop_idx: {}", + new_rules.unwrap_or(&inner_self.rules), + roll_back, + stop_idx + ); + + for (idx, rx) in inner_self.rx_rings.iter_mut().take(stop_idx).enumerate() { + if let Err(e) = rx.reprogram(&mut inner_self.api, &new_rules.unwrap_or(&inner_self.rules)) { + error!("Failed to reprogram rule set for queue {}: {:?}", idx, e); + if roll_back { + error!("Rolling back to previous XDP rule set"); + if let Err(sub_err) = self.reprogram(None, false, idx) { + error!("Failed to roll back rule set: {:?}", sub_err); + } + return Err(e); + } else { + err = Some(e); + } + } + } + + err.map_or(Ok(()), |e| Err(e)) + } + + /// Updates the rule set to a new rule vector, triggering a reprogram of the XDP runtime. If + /// roll_back is false, the rule set is updated even if the reprogram fails. + fn update_rule_set(&mut self, new_rules: Vec<(Protocol, u16)>, roll_back: bool) -> Result<(), Fail> { + let result: Result<(), Fail> = self.reprogram(Some(&new_rules), roll_back, self.0.borrow().rx_rings.len()); + if result.is_ok() || !roll_back { + self.0.borrow_mut().rules = new_rules; + } + result + } + + /// Adds a rule to the current rule set, triggering a reprogram of the XDP runtime. + fn add_rule(&mut self, protocol: Protocol, port: u16) -> Result<(), Fail> { + trace!("add_rule(): protocol={:?}, port={}", protocol, port); + + let inner_self: &mut CatpowderRuntimeInner = self.0.borrow_mut(); + let mut rules: Vec<(Protocol, u16)> = inner_self.rules.clone(); + rules.push((protocol, port)); + self.update_rule_set(rules, true) + } + + /// Removes a rule from the current rule set, triggering a reprogram of the XDP runtime. + pub fn remove_rule(&mut self, protocol: Protocol, port: u16) -> Result<(), Fail> { + trace!("remove_rule(): protocol={:?}, port={}", protocol, port); + + let inner_self: &mut CatpowderRuntimeInner = self.0.borrow_mut(); + let mut rules: Vec<(Protocol, u16)> = inner_self.rules.clone(); + rules.retain(|(p, prt)| *p != protocol || *prt != port); + self.update_rule_set(rules, false) + } + + /// Add a rule for a port binding, triggering a reprogram of the XDP runtime. On success, + pub fn bind(&mut self, socket: &Socket, local: SocketAddr) -> Result<(Protocol, u16), Fail> { + let protocol: Protocol = match socket { + Socket::Udp(_) => Protocol::Udp, + Socket::Tcp(_) => Protocol::Tcp, + }; + + self.add_rule(protocol, local.port())?; + Ok((protocol, local.port())) } } diff --git a/src/rust/catpowder/win/socket.rs b/src/rust/catpowder/win/socket.rs index 108dde819..71757a07e 100644 --- a/src/rust/catpowder/win/socket.rs +++ b/src/rust/catpowder/win/socket.rs @@ -50,6 +50,8 @@ impl XdpSocket { pub fn bind(&self, api: &mut XdpApi, ifindex: u32, queueid: u32, flags: i32) -> Result<(), Fail> { let api: libxdp::XDP_API_TABLE = api.get(); + trace!("bind(): ifindex={}, queueid={}, flags={}", ifindex, queueid, flags); + if let Some(bind) = api.XskBind { let result: HRESULT = unsafe { bind(self.0, ifindex, queueid, flags) }; let error: windows::core::Error = windows::core::Error::from_hresult(result); @@ -68,17 +70,16 @@ impl XdpSocket { } /// Set options in the target socket. - pub fn setsockopt( - &mut self, - api: &mut XdpApi, - opt: u32, - val: *const std::ffi::c_void, - len: u32, - ) -> Result<(), Fail> { + pub fn setsockopt(&mut self, api: &mut XdpApi, opt: u32, val: &T, len: u32) -> Result<(), Fail> + where + T: std::fmt::Debug, + { let api: libxdp::XDP_API_TABLE = api.get(); + trace!("setsockopt(): opt={}, val={:?}, len={}", opt, val, len); + if let Some(setsocket) = api.XskSetSockopt { - let result: HRESULT = unsafe { setsocket(self.0, opt, val, len) }; + let result: HRESULT = unsafe { setsocket(self.0, opt, val as *const T as *const libc::c_void, len) }; let error: windows::core::Error = windows::core::Error::from_hresult(result); match error.code().is_ok() { true => Ok(()), @@ -92,20 +93,23 @@ impl XdpSocket { } /// Get options from the target socket. - pub fn getsockopt( - &self, - api: &mut XdpApi, - opt: u32, - val: *mut std::ffi::c_void, - len: *mut u32, - ) -> Result<(), Fail> { + pub fn getsockopt(&self, api: &mut XdpApi, opt: u32, val: &mut T, len: &mut u32) -> Result<(), Fail> + where + T: std::fmt::Debug, + { let api: libxdp::XDP_API_TABLE = api.get(); + let val_ptr: *mut libc::c_void = val as *mut T as *mut libc::c_void; + trace!("getsockopt(): opt={}, val={:?}, len={}", opt, val_ptr, *len); + if let Some(getsockopt) = api.XskGetSockopt { - let result: HRESULT = unsafe { getsockopt(self.0, opt, val, len) }; + let result: HRESULT = unsafe { getsockopt(self.0, opt, val_ptr, len as *mut u32) }; let error: windows::core::Error = windows::core::Error::from_hresult(result); match error.code().is_ok() { - true => Ok(()), + true => { + trace!("getsockopt(): val={:?}", val); + Ok(()) + }, false => return Err(Fail::from(&error)), } } else { @@ -119,6 +123,8 @@ impl XdpSocket { pub fn activate(&self, api: &mut XdpApi, flags: i32) -> Result<(), Fail> { let api: libxdp::XDP_API_TABLE = api.get(); + trace!("activate(): flags={}", flags); + if let Some(activate) = api.XskActivate { let result: HRESULT = unsafe { activate(self.0, flags) }; let error: windows::core::Error = windows::core::Error::from_hresult(result); @@ -143,6 +149,8 @@ impl XdpSocket { ) -> Result<(), Fail> { let api: libxdp::XDP_API_TABLE = api.get(); + trace!("notify(): flags={}, timeout={}", flags, timeout); + if let Some(notify) = api.XskNotifySocket { let result: HRESULT = unsafe { notify(self.0, flags, timeout, result) }; let error: windows::core::Error = windows::core::Error::from_hresult(result); diff --git a/src/rust/catpowder/win/transport.rs b/src/rust/catpowder/win/transport.rs new file mode 100644 index 000000000..e72874a20 --- /dev/null +++ b/src/rust/catpowder/win/transport.rs @@ -0,0 +1,168 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +//====================================================================================================================== +// Imports +//====================================================================================================================== + +use std::{ + future::Future, + net::{SocketAddr, SocketAddrV4}, +}; + +use crate::{ + catpowder::win::runtime::SharedCatpowderRuntime, + demi_sgarray_t, + demikernel::config::Config, + inetstack::{protocols::Protocol, SharedInetStack}, + runtime::{ + fail::Fail, + memory::{DemiBuffer, MemoryRuntime}, + network::transport::NetworkTransport, + SharedDemiRuntime, SharedObject, + }, + SocketOption, +}; + +//====================================================================================================================== +// Structures +//====================================================================================================================== + +/// Underlying network transport. +pub struct CatpowderTransport { + /// Catpowder runtime instance. + runtime: SharedCatpowderRuntime, + + /// Underlying inet stack. + inet_stack: SharedInetStack, +} + +/// A network transport built on top of Windows overlapped I/O. +#[derive(Clone)] +pub struct SharedCatpowderTransport(SharedObject); + +//====================================================================================================================== +// Associated Functions +//====================================================================================================================== + +impl SharedCatpowderTransport { + /// Creates a new network transport instance. + pub fn new( + config: &Config, + runtime: SharedDemiRuntime, + layer1_endpoint: SharedCatpowderRuntime, + ) -> Result { + let inet_stack: SharedInetStack = SharedInetStack::new(config, runtime, layer1_endpoint.clone())?; + Ok(Self(SharedObject::new(CatpowderTransport { + runtime: layer1_endpoint, + inet_stack, + }))) + } +} + +//====================================================================================================================== +// Trait Implementations +//====================================================================================================================== + +impl NetworkTransport for SharedCatpowderTransport { + type SocketDescriptor = ::SocketDescriptor; + + fn socket(&mut self, domain: socket2::Domain, typ: socket2::Type) -> Result { + self.0.inet_stack.socket(domain, typ) + } + + fn set_socket_option(&mut self, sd: &mut Self::SocketDescriptor, option: SocketOption) -> Result<(), Fail> { + self.0.inet_stack.set_socket_option(sd, option) + } + + fn get_socket_option( + &mut self, + sd: &mut Self::SocketDescriptor, + option: SocketOption, + ) -> Result { + self.0.inet_stack.get_socket_option(sd, option) + } + + fn getpeername(&mut self, sd: &mut Self::SocketDescriptor) -> Result { + self.0.inet_stack.getpeername(sd) + } + + fn bind(&mut self, sd: &mut Self::SocketDescriptor, local: std::net::SocketAddr) -> Result<(), Fail> { + let (protocol, port): (Protocol, u16) = self.0.runtime.bind(sd, local)?; + + if let Err(e) = self.0.inet_stack.bind(sd, local) { + if let Err(sub_err) = self.0.runtime.remove_rule(protocol, port) { + error!("Failed to remove rule: {:?}", sub_err); + } + Err(e) + } else { + Ok(()) + } + } + + fn listen(&mut self, sd: &mut Self::SocketDescriptor, backlog: usize) -> Result<(), Fail> { + self.0.inet_stack.listen(sd, backlog) + } + + fn hard_close(&mut self, sd: &mut Self::SocketDescriptor) -> Result<(), Fail> { + self.0.inet_stack.hard_close(sd) + } + + fn accept( + &mut self, + sd: &mut Self::SocketDescriptor, + ) -> impl Future> { + self.0.inet_stack.accept(sd) + } + + fn connect( + &mut self, + sd: &mut Self::SocketDescriptor, + remote: SocketAddr, + ) -> impl Future> { + self.0.inet_stack.connect(sd, remote) + } + + fn push( + &mut self, + sd: &mut Self::SocketDescriptor, + buf: &mut DemiBuffer, + addr: Option, + ) -> impl Future> { + self.0.inet_stack.push(sd, buf, addr) + } + + fn pop( + &mut self, + sd: &mut Self::SocketDescriptor, + size: usize, + ) -> impl Future, DemiBuffer), Fail>> { + self.0.inet_stack.pop(sd, size) + } + + fn close(&mut self, sd: &mut Self::SocketDescriptor) -> impl Future> { + self.0.inet_stack.close(sd) + } + + fn get_runtime(&self) -> &SharedDemiRuntime { + self.0.inet_stack.get_runtime() + } +} + +impl MemoryRuntime for SharedCatpowderTransport { + fn clone_sgarray(&self, sga: &demi_sgarray_t) -> Result { + self.0.inet_stack.clone_sgarray(sga) + } + + fn into_sgarray(&self, buf: DemiBuffer) -> Result { + self.0.inet_stack.into_sgarray(buf) + } + + fn sgaalloc(&self, size: usize) -> Result { + self.0.inet_stack.sgaalloc(size) + } + + fn sgafree(&self, sga: demi_sgarray_t) -> Result<(), Fail> { + self.0.inet_stack.sgafree(sga) + } +} diff --git a/src/rust/demikernel/libos/mod.rs b/src/rust/demikernel/libos/mod.rs index a3ea91bdd..833d65674 100644 --- a/src/rust/demikernel/libos/mod.rs +++ b/src/rust/demikernel/libos/mod.rs @@ -9,14 +9,14 @@ pub mod network; //====================================================================================================================== use self::name::LibOSName; -#[cfg(feature = "catnip-libos")] -use crate::catnip::runtime::SharedDPDKRuntime; +#[cfg(feature = "catnap-libos")] +use crate::catnap::transport::SharedCatnapTransport; #[cfg(feature = "catpowder-libos")] -use crate::catpowder::SharedCatpowderRuntime; -#[cfg(any(feature = "catpowder-libos", feature = "catnip-libos"))] -use crate::inetstack::SharedInetStack; +use crate::catpowder::{SharedCatpowderRuntime, SharedCatpowderTransport}; #[cfg(feature = "profiler")] use crate::perftools::profiler::set_callback; +#[cfg(feature = "catnip-libos")] +use crate::{catnip::runtime::SharedDPDKRuntime, inetstack::SharedInetStack}; use crate::{ demikernel::{ config::Config, @@ -37,9 +37,6 @@ use ::std::{ time::Duration, }; -#[cfg(feature = "catnap-libos")] -use crate::catnap::transport::SharedCatnapTransport; - //====================================================================================================================== // Structures //====================================================================================================================== @@ -93,12 +90,13 @@ impl LibOS { #[cfg(feature = "catpowder-libos")] LibOSName::Catpowder => { let layer1_endpoint: SharedCatpowderRuntime = SharedCatpowderRuntime::new(&config)?; - // This is our transport for Catpowder. - let inetstack: SharedInetStack = - SharedInetStack::new(&config, runtime.clone(), layer1_endpoint).unwrap(); - Self::NetworkLibOS(NetworkLibOSWrapper::Catpowder( - SharedNetworkLibOS::::new(runtime, inetstack), - )) + let catpowder_transport: SharedCatpowderTransport = + SharedCatpowderTransport::new(&config, runtime.clone(), layer1_endpoint.clone()).unwrap(); + Self::NetworkLibOS(NetworkLibOSWrapper::Catpowder(SharedNetworkLibOS::< + SharedCatpowderTransport, + >::new( + runtime, catpowder_transport + ))) }, #[cfg(feature = "catnip-libos")] LibOSName::Catnip => { diff --git a/src/rust/demikernel/libos/network/mod.rs b/src/rust/demikernel/libos/network/mod.rs index f8906a48d..969f7397b 100644 --- a/src/rust/demikernel/libos/network/mod.rs +++ b/src/rust/demikernel/libos/network/mod.rs @@ -27,7 +27,10 @@ use ::std::{ time::Duration, }; -#[cfg(any(feature = "catpowder-libos", feature = "catnip-libos"))] +#[cfg(feature = "catpowder-libos")] +use crate::catpowder::SharedCatpowderTransport; + +#[cfg(feature = "catnip-libos")] use crate::inetstack::SharedInetStack; #[cfg(all(feature = "catnap-libos"))] @@ -40,7 +43,7 @@ use crate::catnap::transport::SharedCatnapTransport; /// Network LIBOS. pub enum NetworkLibOSWrapper { #[cfg(feature = "catpowder-libos")] - Catpowder(SharedNetworkLibOS), + Catpowder(SharedNetworkLibOS), #[cfg(all(feature = "catnap-libos"))] Catnap(SharedNetworkLibOS), #[cfg(feature = "catnip-libos")] diff --git a/src/rust/inetstack/protocols/mod.rs b/src/rust/inetstack/protocols/mod.rs index 46dd34f75..352e03ba1 100644 --- a/src/rust/inetstack/protocols/mod.rs +++ b/src/rust/inetstack/protocols/mod.rs @@ -28,6 +28,7 @@ pub const MAX_HEADER_SIZE: usize = // Structures //====================================================================================================================== +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Protocol { Tcp, Udp,