diff --git a/network_simulator/input/tcp/close/close-local-retransmission.pkt b/network_simulator/input/tcp/close/close-local-retransmission.pkt index 466747fab..176e6f77e 100644 --- a/network_simulator/input/tcp/close/close-local-retransmission.pkt +++ b/network_simulator/input/tcp/close/close-local-retransmission.pkt @@ -32,6 +32,9 @@ // Receive ACK on data packet. +.1 TCP < . seq 1(0) ack 1001 win 65535 +// Send FIN again since no ack on it yet. ++.0 TCP > F. seq 1001(0) ack 1 win 65535 + // Send completes +.0 wait(500, ...) = 9 diff --git a/network_simulator/input/tcp/close/close-remote-retransmission.pkt b/network_simulator/input/tcp/close/close-remote-retransmission.pkt new file mode 100644 index 000000000..fdfa80684 --- /dev/null +++ b/network_simulator/input/tcp/close/close-remote-retransmission.pkt @@ -0,0 +1,38 @@ +// Tests for remote close. + +// Establish a connection. + +.0 socket(..., SOCK_STREAM, IPPROTO_TCP) = 500 ++.2 connect(500, ..., ...) = 0 + +// Send SYN segment. ++.0 TCP > S seq 0(0) win 65535 +// Receive SYN-ACK segment. ++.1 TCP < S. seq 0(0) ack 1 win 65535 +// Send ACK on SYN-ACK segment. ++.0 TCP > . seq 1(0) ack 1 win 65535 + +// Succeed to establish connection. ++.0 wait(500, ...) = 0 + +// Receive FIN segment. ++.1 TCP < F. seq 1(0) ack 1 win 65535 +// Send ACK on FIN segment. ++.0 TCP > . seq 1(0) ack 2 win 65534 + +// Close connection. ++.2 close(500) = 0 + +// Send FIN segment. ++.0 TCP > F. seq 1(0) ack 2 win 65534 + +// Re-send FIN segment. ++1 TCP > F. seq 1(0) ack 2 win 65534 + +// Re-send FIN segment. ++3 TCP > F. seq 1(0) ack 2 win 65534 + +// Receive ACK on FIN segment. ++.1 TCP < . seq 2(0) ack 2 win 65535 + +// Succeed to close connection immediately. ++.0 wait(500, ...) = 0 diff --git a/src/rust/inetstack/protocols/layer4/tcp/established/background/mod.rs b/src/rust/inetstack/protocols/layer4/tcp/established/background/mod.rs index 0f7ebde11..760edc47d 100644 --- a/src/rust/inetstack/protocols/layer4/tcp/established/background/mod.rs +++ b/src/rust/inetstack/protocols/layer4/tcp/established/background/mod.rs @@ -25,11 +25,6 @@ pub async fn background(cb: SharedControlBlock, _dead_socket_tx: mpsc::Unbounded let receiver = async_timer!("tcp::established::background::receiver", cb2.poll()).fuse(); pin_mut!(receiver); - let r = futures::select_biased! { - r = receiver => r, - r = acknowledger => r, - r = retransmitter => r, - r = sender => r, - }; + let r = futures::join!(receiver, acknowledger, retransmitter, sender); error!("Connection terminated: {:?}", r); } diff --git a/src/rust/inetstack/protocols/layer4/tcp/established/ctrlblk.rs b/src/rust/inetstack/protocols/layer4/tcp/established/ctrlblk.rs index 31f978c56..3a202d6ea 100644 --- a/src/rust/inetstack/protocols/layer4/tcp/established/ctrlblk.rs +++ b/src/rust/inetstack/protocols/layer4/tcp/established/ctrlblk.rs @@ -31,13 +31,13 @@ use crate::{ yield_with_timeout, SharedDemiRuntime, SharedObject, }, }; +use ::futures::never::Never; use ::std::{ collections::VecDeque, net::{Ipv4Addr, SocketAddrV4}, ops::{Deref, DerefMut}, time::{Duration, Instant}, }; -use futures::never::Never; //====================================================================================================================== // Constants @@ -92,20 +92,24 @@ struct Receiver { // // Sequence number of next byte of data in the unread queue. - reader_next: SeqNumber, + reader_next_seq_no: SeqNumber, // Sequence number of the next byte of data (or FIN) that we expect to receive. In RFC 793 terms, this is RCV.NXT. - receive_next: SeqNumber, + receive_next_seq_no: SeqNumber, + + // Sequnce number of the last byte of data (FIN). + fin_seq_no: SharedAsyncValue>, // Receive queue. Contains in-order received (and acknowledged) data ready for the application to read. recv_queue: AsyncQueue, } impl Receiver { - pub fn new(reader_next: SeqNumber, receive_next: SeqNumber) -> Self { + pub fn new(reader_next_seq_no: SeqNumber, receive_next_seq_no: SeqNumber) -> Self { Self { - reader_next, - receive_next, + reader_next_seq_no, + receive_next_seq_no, + fin_seq_no: SharedAsyncValue::new(None), recv_queue: AsyncQueue::with_capacity(MIN_RECV_QUEUE_SIZE_FRAMES), } } @@ -123,7 +127,14 @@ impl Receiver { self.recv_queue.pop(None).await? }; - self.reader_next = self.reader_next + SeqNumber::from(buf.len() as u32); + match buf.len() { + len if len > 0 => { + self.reader_next_seq_no = self.reader_next_seq_no + SeqNumber::from(buf.len() as u32); + }, + _ => { + self.reader_next_seq_no = self.reader_next_seq_no + 1.into(); + }, + } Ok(buf) } @@ -131,8 +142,29 @@ impl Receiver { pub fn push(&mut self, buf: DemiBuffer) { let buf_len: u32 = buf.len() as u32; self.recv_queue.push(buf); - // TODO: Does this need to roll over? - self.receive_next = self.receive_next + SeqNumber::from(buf_len as u32); + self.receive_next_seq_no = self.receive_next_seq_no + SeqNumber::from(buf_len as u32); + } + + pub fn push_fin(&mut self) { + self.recv_queue.push(DemiBuffer::new(0)); + debug_assert_eq!(self.receive_next_seq_no, self.fin_seq_no.get().unwrap()); + // Reset it to wake up any close coroutines waiting for FIN to arrive. + self.fin_seq_no.set(Some(self.receive_next_seq_no)); + // Move RECV_NXT over the FIN. + self.receive_next_seq_no = self.receive_next_seq_no + 1.into(); + } + + // Return Ok after FIN arrives (plus all previous data). + pub async fn wait_for_fin(&mut self) -> Result<(), Fail> { + let mut fin_seq_no: Option = self.fin_seq_no.get(); + loop { + match fin_seq_no { + Some(fin_seq_no) if self.receive_next_seq_no >= fin_seq_no => return Ok(()), + _ => { + fin_seq_no = self.fin_seq_no.wait_for_change(None).await?; + }, + } + } } } @@ -215,10 +247,6 @@ pub struct ControlBlock { // TODO: Change this to a single number for SND.UNA receive_ack_queue_frame_bytes: SharedAsyncQueue, - // The sequence number of the FIN, if we received it out-of-order. - // Note: This could just be a boolean to remember if we got a FIN; the sequence number is for checking correctness. - receive_out_of_order_fin: Option, - // This queue notifies the parent passive socket that created the socket that the socket is closing. This is / // necessary because routing for this socket goes through the parent socket if the connection set up is still // inflight (but also after the connection is established for some reason). @@ -273,7 +301,6 @@ impl SharedControlBlock { receive_buffer_size_frames: receive_window_size_frames, receive_window_scale_shift_bits, receive_out_of_order_frames: VecDeque::new(), - receive_out_of_order_fin: Option::None, receiver: Receiver::new(receive_initial_seq_no, receive_initial_seq_no), congestion_control_algorithm: congestion_control_algorithm_constructor( sender_mss, @@ -294,12 +321,6 @@ impl SharedControlBlock { self.remote } - pub fn send(&mut self, buf: DemiBuffer) -> Result<(), Fail> { - let self_: Self = self.clone(); - self.sender.immediate_send(buf, self_)?; - Ok(()) - } - pub async fn background_retransmitter(mut self) -> Result { let cb: Self = self.clone(); self.sender.background_retransmitter(cb).await @@ -348,28 +369,11 @@ impl SharedControlBlock { // This is the main TCP processing routine. pub async fn poll(&mut self) -> Result { + let mut receive_queue: SharedAsyncQueue<(Ipv4Addr, TcpHeader, DemiBuffer)> = self.recv_queue.clone(); + // Normal data processing in the Established state. loop { - let (header, data): (TcpHeader, DemiBuffer) = match self.recv_queue.pop(None).await { - Ok((_, header, data)) if self.state == State::Established => (header, data), - Ok(result) => { - self.recv_queue.push_front(result); - let cause: String = format!( - "ending receive polling loop for non-established connection (local={:?}, remote={:?})", - self.local, self.remote - ); - error!("poll(): {}", cause); - return Err(Fail::new(libc::ECANCELED, &cause)); - }, - Err(e) => { - let cause: String = format!( - "ending receive polling loop for active connection (local={:?}, remote={:?})", - self.local, self.remote - ); - warn!("poll(): {:?} ({:?})", cause, e); - return Err(e); - }, - }; + let (_, header, data): (Ipv4Addr, TcpHeader, DemiBuffer) = receive_queue.pop(None).await?; debug!( "{:?} Connection Receiving {} bytes + {:?}", @@ -380,20 +384,20 @@ impl SharedControlBlock { match self.process_packet(header, data) { Ok(()) => (), - Err(e) if e.errno == libc::ECONNRESET => { - if let Some(mut socket_tx) = self.parent_passive_socket_close_queue.take() { - socket_tx.push(self.remote); - } - self.state = State::CloseWait; - let cause: String = format!( - "remote closed connection, stopping processing (local={:?}, remote={:?})", - self.local, self.remote - ); - error!("poll(): {}", cause); - return Err(Fail::new(libc::ECANCELED, &cause)); - }, Err(e) => debug!("Dropped packet: {:?}", e), } + + // Check if we have received everything past the FIN on this connection, then it is safe to exit this loop. + if self.state == State::Closed || self.state == State::TimeWait { + let cause: String = format!( + "ending receive polling loop for active connection (local={:?}, remote={:?})", + self.local, self.remote + ); + if let Some(mut socket_tx) = self.parent_passive_socket_close_queue.take() { + socket_tx.push(self.remote); + } + return Err(Fail::new(libc::ECONNRESET, &cause)); + } } } @@ -402,7 +406,6 @@ impl SharedControlBlock { /// packet should be dropped after the step. fn process_packet(&mut self, mut header: TcpHeader, mut data: DemiBuffer) -> Result<(), Fail> { let mut seg_start: SeqNumber = header.seq_num; - let mut seg_end: SeqNumber = seg_start; let mut seg_len: u32 = data.len() as u32; @@ -417,10 +420,39 @@ impl SharedControlBlock { warn!("Got packet with URG bit set!"); } - if data.len() > 0 || header.fin { - self.process_data(&mut header, data, seg_start, seg_end, seg_len)?; + if data.len() > 0 { + self.process_data(data, seg_start, seg_end, seg_len)?; + } + + // Process FIN flag. + if header.fin { + match self.receiver.fin_seq_no.get() { + // We've already received this FIN, so ignore. + Some(seq_no) if seq_no != seg_end => warn!( + "Received a FIN with a different sequence number, ignoring. previous={:?} new={:?}", + seq_no, seg_end, + ), + Some(_) => trace!("Received duplicate FIN"), + None => { + trace!("Received FIN"); + self.receiver.fin_seq_no.set(seg_end.into()) + }, + } + } + // Check whether we've received the last packet. + if self + .receiver + .fin_seq_no + .get() + .is_some_and(|seq_no| seq_no == self.receiver.receive_next_seq_no) + { + self.process_fin(); + } + if header.fin { + // Send ack for out of order FIN. + trace!("Acking FIN"); + self.send_ack() } - self.process_remote_close(&header)?; // We should ACK this segment, preferably via piggybacking on a response. // TODO: Consider replacing the delayed ACK timer with a simple flag. if self.receive_ack_deadline_time_secs.get().is_none() { @@ -481,7 +513,7 @@ impl SharedControlBlock { *seg_end = *seg_start + SeqNumber::from(*seg_len - 1); } - let receive_next: SeqNumber = self.receiver.receive_next; + let receive_next: SeqNumber = self.receiver.receive_next_seq_no; let after_receive_window: SeqNumber = receive_next + SeqNumber::from(self.get_receive_window_size()); @@ -651,32 +683,21 @@ impl SharedControlBlock { fn process_data( &mut self, - header: &mut TcpHeader, data: DemiBuffer, seg_start: SeqNumber, - mut seg_end: SeqNumber, - mut seg_len: u32, + seg_end: SeqNumber, + seg_len: u32, ) -> Result<(), Fail> { // We can only process in-order data (or FIN). Check for out-of-order segment. - if seg_start != self.receiver.receive_next { + if seg_start != self.receiver.receive_next_seq_no { debug!("Received out-of-order segment"); // This segment is out-of-order. If it carries data, and/or a FIN, we should store it for later processing // after the "hole" in the sequence number space has been filled. if seg_len > 0 { match self.state { State::Established | State::FinWait1 | State::FinWait2 => { - // We can only legitimately receive data in ESTABLISHED, FIN-WAIT-1, and FIN-WAIT-2. - if header.fin { - seg_len -= 1; - self.store_out_of_order_fin(seg_end); - seg_end = seg_end - SeqNumber::from(1); - // Clear header FIN and process later. - header.fin = false; - } debug_assert_eq!(seg_len, data.len() as u32); - if seg_len > 0 { - self.store_out_of_order_segment(seg_start, seg_end, data); - } + self.store_out_of_order_segment(seg_start, seg_end, data); // Sending an ACK here is only a "MAY" according to the RFCs, but helpful for fast retransmit. trace!("process_data(): send ack on out-of-order segment"); self.send_ack(); @@ -690,7 +711,7 @@ impl SharedControlBlock { } // We can only legitimately receive data in ESTABLISHED, FIN-WAIT-1, and FIN-WAIT-2. - header.fin |= self.receive_data(seg_start, data); + self.receive_data(seg_start, data); Ok(()) } @@ -702,7 +723,7 @@ impl SharedControlBlock { // Note that once we reach a synchronized state we always include a valid acknowledgement number. header.ack = true; - header.ack_num = self.receiver.receive_next; + header.ack_num = self.receiver.receive_next_seq_no; // Return this header. header @@ -736,7 +757,6 @@ impl SharedControlBlock { // This routine should only ever be called to send TCP segments that contain a valid ACK value. debug_assert!(header.ack); - let sent_fin: bool = header.fin; let remote_ipv4_addr: Ipv4Addr = self.remote.ip().clone(); header.serialize_and_attach( &mut pkt, @@ -759,20 +779,6 @@ impl SharedControlBlock { // Since we sent an ACK, cancel any outstanding delayed ACK request. self.set_receive_ack_deadline(None); - - // If we sent a FIN, update our protocol state. - if sent_fin { - match self.state { - // Active close. - State::Established => self.state = State::FinWait1, - // Passive close. - State::CloseWait => self.state = State::LastAck, - // We can legitimately retransmit the FIN in these states. And we stay there until the FIN is ACK'd. - State::FinWait1 | State::LastAck => {}, - // We shouldn't be sending a FIN from any other state. - state => unreachable!("Sent FIN while in nonsensical TCP state {:?}", state), - } - } } pub fn get_receive_ack_deadline(&self) -> SharedAsyncValue> { @@ -784,7 +790,7 @@ impl SharedControlBlock { } pub fn get_receive_window_size(&self) -> u32 { - let bytes_unread: u32 = (self.receiver.receive_next - self.receiver.reader_next).into(); + let bytes_unread: u32 = (self.receiver.receive_next_seq_no - self.receiver.reader_next_seq_no).into(); self.receive_buffer_size_frames - bytes_unread } @@ -803,21 +809,8 @@ impl SharedControlBlock { hdr_window_size } - pub async fn push(&mut self, mut nbytes: usize) -> Result<(), Fail> { - loop { - let n: usize = self.receive_ack_queue_frame_bytes.pop(None).await?; - - if n > nbytes { - self.receive_ack_queue_frame_bytes.push_front(n - nbytes); - break Ok(()); - } - - nbytes -= n; - - if nbytes == 0 { - break Ok(()); - } - } + pub async fn push(&mut self, buf: DemiBuffer) -> Result<(), Fail> { + self.sender.push(buf).await } pub async fn pop(&mut self, size: Option) -> Result { @@ -831,12 +824,6 @@ impl SharedControlBlock { self.receiver.pop(size).await } - // This routine remembers that we have received an out-of-order FIN. - // - fn store_out_of_order_fin(&mut self, fin: SeqNumber) { - self.receive_out_of_order_fin = Some(fin); - } - // This routine takes an incoming TCP segment and adds it to the out-of-order receive queue. // If the new segment had a FIN it has been removed prior to this routine being called. // Note: Since this is not the "fast path", this is written for clarity over efficiency. @@ -948,8 +935,8 @@ impl SharedControlBlock { // // Returns true if a previously out-of-order segment containing a FIN has now been received. // - fn receive_data(&mut self, seg_start: SeqNumber, buf: DemiBuffer) -> bool { - let recv_next: SeqNumber = self.receiver.receive_next; + fn receive_data(&mut self, seg_start: SeqNumber, buf: DemiBuffer) { + let recv_next: SeqNumber = self.receiver.receive_next_seq_no; // This routine should only be called with in-order segment data. debug_assert_eq!(seg_start, recv_next); @@ -978,62 +965,17 @@ impl SharedControlBlock { } } } - - // TODO: Review recent change to update control block copy of recv_next upon each push to the receiver. - // When receiving a retransmitted segment that fills a "hole" in the receive space, thus allowing a number - // (potentially large number) of out-of-order segments to be added, we'll be modifying the TCB copy of - // recv_next many times. - // Anyhow that recent change removes the need for the following two lines: - // Update our receive sequence number (i.e. RCV.NXT) appropriately. - // self.receive_next.set(recv_next); - - // This is a lot of effort just to check the FIN sequence number is correct in debug builds. - // Regardless of whether we have out of order data, check for the FIN because it might not have come on a data - // carrying packet. - match self.receive_out_of_order_fin { - Some(fin) if fin == recv_next => { - return true; - }, - _ => (), - } - - false - } - - fn process_remote_close(&mut self, header: &TcpHeader) -> Result<(), Fail> { - if header.fin { - trace!("Received FIN"); - // 2. Push empty buffer to indicate EOF. - // TODO: set err bit and wake. - self.receiver.push(DemiBuffer::new(0)); - - // 3. Advance RCV.NXT over the FIN. - self.receiver.receive_next = self.receiver.receive_next + SeqNumber::from(1); - - // 4. Since we consumed the FIN we ACK immediately rather than opportunistically. - // TODO: Consider doing this opportunistically. Note our current tests expect the immediate behavior. - trace!("process_remote_close(): send ack on received fin"); - self.send_ack(); - let cause: String = format!("connection received FIN"); - info!("process_remote_close(): {}", cause); - if let Some(mut socket_tx) = self.parent_passive_socket_close_queue.take() { - socket_tx.push(self.remote); - } - return Err(Fail::new(libc::ECONNRESET, &cause)); - } - - Ok(()) } - /// Send a fin by pushing a zero-length DemiBuffer to the sender function. - fn send_fin(&mut self) { - // Construct FIN. - // TODO: Remove this allocation. - let fin_buf: DemiBuffer = DemiBuffer::new_with_headroom(0, MAX_HEADER_SIZE as u16); - // Send. - if let Err(e) = self.send(fin_buf) { - warn!("send_fin(): failed to send fin ({:?})", e); - } + fn process_fin(&mut self) { + let state = match self.state { + State::Established => State::CloseWait, + State::FinWait1 => State::Closing, + State::FinWait2 => State::TimeWait, + state => unreachable!("Cannot be in any other state at this point: {:?}", state), + }; + self.state = state; + self.receiver.push_fin(); } // This coroutine runs the close protocol. @@ -1051,47 +993,23 @@ impl SharedControlBlock { } async fn local_close(&mut self) -> Result<(), Fail> { - // 0. Set state. + // 1. Start close protocol by setting state and sending FIN. self.state = State::FinWait1; - // 1. Send FIN. - self.send_fin(); - - // 2. TIME_WAIT - while self.state != State::TimeWait { - // Wait for next packet. - let (_, header, _) = self.recv_queue.pop(None).await?; - - // Check ACK. - self.state = match self.process_ack(&header) { - // Got ACK to our FIN. - Ok(()) => match self.state { - State::FinWait1 => State::FinWait2, - State::FinWait2 => State::FinWait2, - State::Closing => State::TimeWait, - state => unreachable!("Cannot be in any other state at this point: {:?}", state), - }, - // Don't do anything if this is an unexpected message. - Err(_) => self.state, - }; - - // TODO: Receive data in the FINWAIT-1 and FINWAIT-2 states. - - // Check FIN. - self.state = match self.process_remote_close(&header) { - // No FIN, keep waiting. - Ok(()) => self.state, - // Found FIN, move to next state. - Err(e) if e.errno == libc::ECONNRESET => match self.state { - State::FinWait1 => State::Closing, - State::FinWait2 => State::TimeWait, - state => unreachable!("Cannot be in any other state: {:?}", state), - }, - // Some other error, stop close protocol. - Err(e) => return Err(e), - } - } - + self.sender.push_fin_and_wait_for_ack().await?; + + // 2. Got ACK to our FIN. Check if we also received a FIN from remote in the meantime. + let state: State = self.state; + match state { + State::FinWait1 => { + self.state = State::FinWait2; + // Haven't received a FIN yet from remote, so wait. + self.receiver.wait_for_fin().await?; + }, + State::Closing => self.state = State::TimeWait, + state => unreachable!("Cannot be in any other state at this point: {:?}", state), + }; // 3. TIMED_WAIT + debug_assert_eq!(self.state, State::TimeWait); trace!("socket options: {:?}", self.socket_options.get_linger()); let timeout: Duration = self.socket_options.get_linger().unwrap_or(MSL * 2); yield_with_timeout(timeout).await; @@ -1100,21 +1018,11 @@ impl SharedControlBlock { } async fn remote_already_closed(&mut self) -> Result<(), Fail> { - // 0. Set state. + // 0. Move state forward self.state = State::LastAck; - // 1. Send FIN. - self.send_fin(); - // Wait for ACK of FIN. - loop { - // Wait for next packet. - let (_, header, _) = self.recv_queue.pop(None).await?; - - // Check ACK. - match self.process_ack(&header) { - Ok(()) => break, - Err(_) => (), - } - } + // 1. Send FIN and wait for ack before closing. + self.sender.push_fin_and_wait_for_ack().await?; + self.state = State::Closed; Ok(()) } } diff --git a/src/rust/inetstack/protocols/layer4/tcp/established/mod.rs b/src/rust/inetstack/protocols/layer4/tcp/established/mod.rs index aab63a19e..809f29e41 100644 --- a/src/rust/inetstack/protocols/layer4/tcp/established/mod.rs +++ b/src/rust/inetstack/protocols/layer4/tcp/established/mod.rs @@ -104,12 +104,8 @@ impl EstablishedSocket { self.recv_queue.clone() } - pub fn send(&mut self, buf: DemiBuffer) -> Result<(), Fail> { - self.cb.send(buf) - } - - pub async fn push(&mut self, nbytes: usize) -> Result<(), Fail> { - self.cb.push(nbytes).await + pub async fn push(&mut self, buf: DemiBuffer) -> Result<(), Fail> { + self.cb.push(buf).await } pub async fn pop(&mut self, size: Option) -> Result { diff --git a/src/rust/inetstack/protocols/layer4/tcp/established/sender.rs b/src/rust/inetstack/protocols/layer4/tcp/established/sender.rs index 38ec58206..cf23e42fc 100644 --- a/src/rust/inetstack/protocols/layer4/tcp/established/sender.rs +++ b/src/rust/inetstack/protocols/layer4/tcp/established/sender.rs @@ -3,7 +3,6 @@ use crate::{ collections::{async_queue::SharedAsyncQueue, async_value::SharedAsyncValue}, - expect_ok, inetstack::protocols::layer4::tcp::{ established::{rto::RtoCalculator, SharedControlBlock}, header::TcpHeader, @@ -25,7 +24,7 @@ use std::cmp; // buffer structure that held everything we need directly, thus avoiding this extra wrapper. // pub struct UnackedSegment { - pub bytes: DemiBuffer, + pub bytes: Option, // Set to `None` on retransmission to implement Karn's algorithm. pub initial_tx: Option, } @@ -73,11 +72,19 @@ pub struct Sender { // Retransmission Timeout (RTO) calculator. rto_calculator: RtoCalculator, - // Sequence Number of the next data to be sent. In RFC 793 terms, this is SND.NXT. - send_next: SharedAsyncValue, + // In RFC 793 terms, this is SND.NXT. + send_next_seq_no: SharedAsyncValue, - // This is the send buffer (user data we do not yet have window to send). - unsent_queue: SharedAsyncQueue, + // Sequence number of next data to be pushed but not sent. When there is an open window, this is equivalent to + // send_next_seq_no. + unsent_next_seq_no: SeqNumber, + + // Sequence number of the FIN, after we should never allocate more sequence numbers. + fin_seq_no: Option, + + // This is the send buffer (user data we do not yet have window to send). If the option is None, then it indicates + // a FIN. This keeps us from having to allocate an empty Demibuffer to indicate FIN. + unsent_queue: SharedAsyncQueue>, // Available window to send into, as advertised by our peer. In RFC 793 terms, this is SND.WND. send_window: SharedAsyncValue, @@ -96,7 +103,7 @@ impl fmt::Debug for Sender { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Sender") .field("send_unacked", &self.send_unacked) - .field("send_next", &self.send_next) + .field("send_next", &self.send_next_seq_no) .field("send_window", &self.send_window) .field("window_scale", &self.send_window_scale_shift_bits) .field("mss", &self.mss) @@ -111,7 +118,9 @@ impl Sender { unacked_queue: SharedAsyncQueue::with_capacity(MIN_UNACKED_QUEUE_SIZE_FRAMES), retransmit_deadline_time_secs: SharedAsyncValue::new(None), rto_calculator: RtoCalculator::new(), - send_next: SharedAsyncValue::new(seq_no), + send_next_seq_no: SharedAsyncValue::new(seq_no), + unsent_next_seq_no: seq_no, + fin_seq_no: None, unsent_queue: SharedAsyncQueue::with_capacity(MIN_UNSENT_QUEUE_SIZE_FRAMES), send_window: SharedAsyncValue::new(send_window), send_window_last_update_seq: seq_no, @@ -121,35 +130,172 @@ impl Sender { } } - pub fn get_open_window_size_bytes(&mut self, cb: &mut SharedControlBlock) -> usize { - // Calculate amount of data in flight (SND.NXT - SND.UNA). - let send_unacknowledged: SeqNumber = self.send_unacked.get(); - let send_next: SeqNumber = self.send_next.get(); - let sent_data: u32 = (send_next - send_unacknowledged).into(); + // This function sends a packet and waits for it to be acked. + pub async fn push(&mut self, buf: DemiBuffer) -> Result<(), Fail> { + // If the user is done sending (i.e. has called close on this connection), then they shouldn't be sending. + debug_assert!(self.fin_seq_no.is_none()); + // Our API supports send buffers up to usize (variable, depends upon architecture) in size. While we could + // allow for larger send buffers, it is simpler and more practical to limit a single send to 1 GiB, which is + // also the maximum value a TCP can advertise as its receive window (with maximum window scaling). + // TODO: the below check just limits a single send to 4 GiB, not 1 GiB. Check this doesn't break anything. + // + // Review: Move this check up the stack (i.e. closer to the user)? + // + let _: u32 = buf + .len() + .try_into() + .map_err(|_| Fail::new(EINVAL, "buffer too large"))?; - // Before we get cwnd for the check, we prompt it to shrink it if the connection has been idle. - cb.congestion_control_on_cwnd_check_before_send(); - let cwnd: SharedAsyncValue = cb.congestion_control_get_cwnd(); + // TODO: We need to fix this the correct way: limit our send buffer size to the amount we're willing to buffer. + if self.unsent_queue.len() > UNSENT_QUEUE_CUTOFF { + return Err(Fail::new(EBUSY, "too many packets to send")); + } - // The limited transmit algorithm can increase the effective size of cwnd by up to 2MSS. - let effective_cwnd: u32 = cwnd.get() + cb.congestion_control_get_limited_transmit_cwnd_increase().get(); + // Place the buffer in the unsent queue. + self.unsent_next_seq_no = self.unsent_next_seq_no + (buf.len() as u32).into(); + self.unsent_queue.push(Some(buf)); - let win_sz: u32 = self.send_window.get(); + // Wait until the sequnce number of the pushed buffer is acknowledged. + let mut send_unacked_watched: SharedAsyncValue = self.send_unacked.clone(); + let ack_seq_no: SeqNumber = self.unsent_next_seq_no; + debug_assert!(send_unacked_watched.get() < ack_seq_no); + while send_unacked_watched.get() < ack_seq_no { + send_unacked_watched.wait_for_change(None).await?; + } + Ok(()) + } - if win_sz > 0 && win_sz >= sent_data && effective_cwnd >= sent_data { - cmp::min( - cmp::min((win_sz - sent_data) as usize, self.mss), - (effective_cwnd - sent_data) as usize, - ) - } else { - 0 + // Places a FIN marker in the outgoing data stream. No data can be pushed after this. + pub async fn push_fin_and_wait_for_ack(&mut self) -> Result<(), Fail> { + debug_assert!(self.fin_seq_no.is_none()); + // TODO: We need to fix this the correct way: limit our send buffer size to the amount we're willing to buffer. + if self.unsent_queue.len() > UNSENT_QUEUE_CUTOFF { + return Err(Fail::new(EBUSY, "too many packets to send")); + } + + self.fin_seq_no = Some(self.unsent_next_seq_no); + self.unsent_next_seq_no = self.unsent_next_seq_no + 1.into(); + self.unsent_queue.push(None); + + let mut send_unacked_watched: SharedAsyncValue = self.send_unacked.clone(); + let fin_ack_num: SeqNumber = self.unsent_next_seq_no; + while self.send_unacked.get() < fin_ack_num { + send_unacked_watched.wait_for_change(None).await?; + } + Ok(()) + } + + pub async fn background_sender(&mut self, mut cb: SharedControlBlock) -> Result { + loop { + // Get next bit of unsent data. + if let Some(buf) = self.unsent_queue.pop(None).await? { + self.send_buffer(buf, &mut cb).await?; + } else { + self.send_fin(&mut cb)?; + // Exit the loop because we no longer have anything to process + return Err(Fail::new(libc::ECONNRESET, "Processed and sent FIN")); + } + } + } + + fn send_fin(&mut self, cb: &mut SharedControlBlock) -> Result<(), Fail> { + let mut header: TcpHeader = cb.tcp_header(); + header.seq_num = self.send_next_seq_no.get(); + debug_assert!(self.fin_seq_no.is_some_and(|s| { s == header.seq_num })); + header.fin = true; + cb.emit(header, None); + // Update SND.NXT. + self.send_next_seq_no.modify(|s| s + 1.into()); + + // Add the FIN to our unacknowledged queue. + let unacked_segment = UnackedSegment { + bytes: None, + initial_tx: Some(cb.get_now()), + }; + self.unacked_queue.push(unacked_segment); + // Set the retransmit timer. + if self.retransmit_deadline_time_secs.get().is_none() { + let rto: Duration = self.rto_calculator.rto(); + trace!("set retransmit: {:?}", rto); + self.retransmit_deadline_time_secs.set(Some(cb.get_now() + rto)); + } + Ok(()) + } + + async fn send_buffer(&mut self, mut buffer: DemiBuffer, cb: &mut SharedControlBlock) -> Result<(), Fail> { + let mut send_unacked_watched: SharedAsyncValue = self.send_unacked.clone(); + let mut cwnd_watched: SharedAsyncValue = cb.congestion_control_get_cwnd(); + + // The limited transmit algorithm may increase the effective size of cwnd by up to 2 * mss. + let mut ltci_watched: SharedAsyncValue = cb.congestion_control_get_limited_transmit_cwnd_increase(); + let mut win_sz_watched: SharedAsyncValue = self.send_window.clone(); + + // Try in a loop until we send this segment. + loop { + // If we don't have any window size at all, we need to transition to PERSIST mode and + // repeatedly send window probes until window opens up. + if win_sz_watched.get() == 0 { + // Send a window probe (this is a one-byte packet designed to elicit a window update from our peer). + self.send_window_probe(buffer.split_front(1)?, cb).await?; + } else { + // TODO: Nagle's algorithm - We need to coalese small buffers together to send MSS sized packets. + // TODO: Silly window syndrome - See RFC 1122's discussion of the SWS avoidance algorithm. + + // We have some window, try to send some or all of the segment. + let _: usize = self.send_segment(&mut buffer, cb); + // If the buffer is now empty, then we sent all of it. + if buffer.len() == 0 { + return Ok(()); + } + // Otherwise, wait until something limiting the window changes and then try again to finish sending + // the segment. + futures::select_biased! { + _ = send_unacked_watched.wait_for_change(None).fuse() => (), + _ = self.send_next_seq_no.wait_for_change(None).fuse() => (), + _ = win_sz_watched.wait_for_change(None).fuse() => (), + _ = cwnd_watched.wait_for_change(None).fuse() => (), + _ = ltci_watched.wait_for_change(None).fuse() => (), + }; + } + } + } + + async fn send_window_probe(&mut self, probe: DemiBuffer, cb: &mut SharedControlBlock) -> Result<(), Fail> { + // Update SND.NXT. + self.send_next_seq_no.modify(|s| s + SeqNumber::from(1)); + + // Add the probe byte (as a new separate buffer) to our unacknowledged queue. + let unacked_segment = UnackedSegment { + bytes: Some(probe.clone()), + initial_tx: Some(cb.get_now()), + }; + self.unacked_queue.push(unacked_segment); + + // Note that we loop here *forever*, exponentially backing off. + // TODO: Use the correct PERSIST mode timer here. + let mut timeout: Duration = Duration::from_secs(1); + let mut win_sz_watched: SharedAsyncValue = self.send_window.clone(); + loop { + // Create packet. + let mut header: TcpHeader = cb.tcp_header(); + header.seq_num = self.send_next_seq_no.get(); + cb.emit(header, Some(probe.clone())); + + match win_sz_watched.wait_for_change(Some(timeout)).await { + Ok(_) => return Ok(()), + Err(Fail { errno, cause: _ }) if errno == libc::ETIMEDOUT => timeout *= 2, + Err(_) => unreachable!( + "either the ack deadline changed or the deadline passed, no other errors are possible!" + ), + } } } - // Takes a segment and attempts to send it. Returns the number of bytes sent and any unsent data remains in - // `segment`. - pub fn send_segment(&mut self, segment: &mut DemiBuffer, cb: &mut SharedControlBlock) -> usize { + // Takes a segment and attempts to send it. The buffer must be non-zero length and the function returns the number + // of bytes sent. + fn send_segment(&mut self, segment: &mut DemiBuffer, cb: &mut SharedControlBlock) -> usize { let buf_len: usize = segment.len(); + debug_assert_ne!(buf_len, 0); // Check window size. let max_frame_size_bytes: usize = match self.get_open_window_size_bytes(cb) { 0 => return 0, @@ -158,43 +304,38 @@ impl Sender { // Split the packet if necessary. // TODO: Use a scatter/gather array to coalesce multiple buffers into a single segment. - let (segment_data, do_push): (DemiBuffer, bool) = { + let (frame_size_bytes, do_push): (usize, bool) = { if buf_len > max_frame_size_bytes { - let outgoing_frame: DemiBuffer = segment - .split_front(max_frame_size_bytes) - .expect("Should be able to splite withint the length of the buffer"); - // Suppress PSH flag for partial buffers. - (outgoing_frame, false) + (max_frame_size_bytes, false) } else { // We can just send the whole packet. Clone it so we can attach headers/retransmit it later. - (segment.clone(), true) + (buf_len, true) } }; - let mut segment_data_len: u32 = segment_data.len() as u32; + let segment_data: DemiBuffer = segment + .split_front(frame_size_bytes) + .expect("Should be able to split within the length of the buffer"); + + let segment_data_len: u32 = segment_data.len() as u32; let rto: Duration = self.rto_calculator.rto(); - cb.congestion_control_on_send(rto, (self.send_next.get() - self.send_unacked.get()).into()); + cb.congestion_control_on_send(rto, (self.send_next_seq_no.get() - self.send_unacked.get()).into()); // Prepare the segment and send it. let mut header: TcpHeader = cb.tcp_header(); - header.seq_num = self.send_next.get(); - if segment_data_len == 0 { - // This buffer is the end-of-send marker. - // Set FIN and adjust sequence number consumption accordingly. - header.fin = true; - segment_data_len = 1; - } else if do_push { + header.seq_num = self.send_next_seq_no.get(); + if do_push { header.psh = true; } cb.emit(header, Some(segment_data.clone())); // Update SND.NXT. - self.send_next.modify(|s| s + SeqNumber::from(segment_data_len)); + self.send_next_seq_no.modify(|s| s + SeqNumber::from(segment_data_len)); // Put this segment on the unacknowledged list. let unacked_segment = UnackedSegment { - bytes: segment_data, + bytes: Some(segment_data), initial_tx: Some(cb.get_now()), }; self.unacked_queue.push(unacked_segment); @@ -207,114 +348,37 @@ impl Sender { segment_data_len as usize } - // This function attempts to send a buffer. If it is able to send, it sends immediately, otherwise, it places the - // buffer in the unsent queue. - pub fn immediate_send(&mut self, mut buf: DemiBuffer, mut cb: SharedControlBlock) -> Result { - // If the user is done sending (i.e. has called close on this connection), then they shouldn't be sending. + fn get_open_window_size_bytes(&mut self, cb: &mut SharedControlBlock) -> usize { + // Calculate amount of data in flight (SND.NXT - SND.UNA). + let send_unacknowledged: SeqNumber = self.send_unacked.get(); + let send_next: SeqNumber = self.send_next_seq_no.get(); + let sent_data: u32 = (send_next - send_unacknowledged).into(); - // Our API supports send buffers up to usize (variable, depends upon architecture) in size. While we could - // allow for larger send buffers, it is simpler and more practical to limit a single send to 1 GiB, which is - // also the maximum value a TCP can advertise as its receive window (with maximum window scaling). - // TODO: the below check just limits a single send to 4 GiB, not 1 GiB. Check this doesn't break anything. - // - // Review: Move this check up the stack (i.e. closer to the user)? - // - let _: u32 = buf - .len() - .try_into() - .map_err(|_| Fail::new(EINVAL, "buffer too large"))?; + // Before we get cwnd for the check, we prompt it to shrink it if the connection has been idle. + cb.congestion_control_on_cwnd_check_before_send(); + let cwnd: SharedAsyncValue = cb.congestion_control_get_cwnd(); - // If the send queue is empty and then we can try to send now. - let sent_bytes: usize = if self.unsent_queue.is_empty() { - trace!("Send immediate"); - self.send_segment(&mut buf, &mut cb) - } else { - 0 - }; + // The limited transmit algorithm can increase the effective size of cwnd by up to 2MSS. + let effective_cwnd: u32 = cwnd.get() + cb.congestion_control_get_limited_transmit_cwnd_increase().get(); - // We did not successfully send some or all of the message. - if sent_bytes == 0 { - // TODO: We need to fix this the correct way: limit our send buffer size to the amount we're willing to buffer. - if self.unsent_queue.len() > UNSENT_QUEUE_CUTOFF { - return Err(Fail::new(EBUSY, "too many packets to send")); - } + let win_sz: u32 = self.send_window.get(); - // Slow path: Delegating sending the data to background processing. - trace!("Queueing Send for background processing"); - self.unsent_queue.push(buf); + if Self::has_open_window(win_sz, sent_data, effective_cwnd) { + Self::calculate_open_window_bytes(win_sz, sent_data, self.mss, effective_cwnd) + } else { + 0 } - Ok(sent_bytes) } - pub async fn background_sender(&mut self, mut cb: SharedControlBlock) -> Result { - let mut send_unacked_watched: SharedAsyncValue = self.send_unacked.clone(); - let mut cwnd_watched: SharedAsyncValue = cb.congestion_control_get_cwnd(); - - // The limited transmit algorithm may increase the effective size of cwnd by up to 2 * mss. - let mut ltci_watched: SharedAsyncValue = cb.congestion_control_get_limited_transmit_cwnd_increase(); - let mut win_sz_watched: SharedAsyncValue = self.send_window.clone(); + fn has_open_window(win_sz: u32, sent_data: u32, effective_cwnd: u32) -> bool { + win_sz > 0 && win_sz >= sent_data && effective_cwnd >= sent_data + } - loop { - // Get next bit of unsent data. - let mut segment = self.unsent_queue.pop(None).await?; - - // Try in a loop until we send this segment. - loop { - // If we don't have any window size at all, we need to transition to PERSIST mode and - // repeatedly send window probes until window opens up. - if win_sz_watched.get() == 0 { - // Send a window probe (this is a one-byte packet designed to elicit a window update from our peer). - let buf: DemiBuffer = segment.split_front(1)?; - // Update SND.NXT. - self.send_next.modify(|s| s + SeqNumber::from(1)); - - // Add the probe byte (as a new separate buffer) to our unacknowledged queue. - let unacked_segment = UnackedSegment { - bytes: buf.clone(), - initial_tx: Some(cb.get_now()), - }; - self.unacked_queue.push(unacked_segment); - - // Note that we loop here *forever*, exponentially backing off. - // TODO: Use the correct PERSIST mode timer here. - let mut timeout: Duration = Duration::from_secs(1); - loop { - // Create packet. - let mut header: TcpHeader = cb.tcp_header(); - header.seq_num = self.send_next.get(); - cb.emit(header, Some(buf.clone())); - - match win_sz_watched.wait_for_change(Some(timeout)).await { - Ok(_) => break, - Err(Fail { errno, cause: _ }) if errno == libc::ETIMEDOUT => timeout *= 2, - Err(_) => unreachable!( - "either the ack deadline changed or the deadline passed, no other errors are possible!" - ), - } - } - } else { - // TODO: Nagle's algorithm - We need to coalese small buffers together to send MSS sized packets. - // TODO: Silly window syndrome - See RFC 1122's discussion of the SWS avoidance algorithm. - - // We have some window, try to send some or all of the segment. - // If the segment started out as 0 bytes of length, we still need to send it as a FIN. - let sent_bytes: usize = self.send_segment(&mut segment, &mut cb); - // If we sent at least a byte and the segment is now empty, then we are done. - if sent_bytes > 0 && segment.len() == 0 { - break; - } - // Otherwise, wait until something limiting the window changes and then try again to finish sending - // the segment. - futures::select_biased! { - _ = send_unacked_watched.wait_for_change(None).fuse() => (), - _ = self.send_next.wait_for_change(None).fuse() => (), - _ = win_sz_watched.wait_for_change(None).fuse() => (), - _ = cwnd_watched.wait_for_change(None).fuse() => (), - _ = ltci_watched.wait_for_change(None).fuse() => (), - }; - } - } - } + fn calculate_open_window_bytes(win_sz: u32, sent_data: u32, mss: usize, effective_cwnd: u32) -> usize { + cmp::min( + cmp::min((win_sz - sent_data) as usize, mss), + (effective_cwnd - sent_data) as usize, + ) } pub async fn background_retransmitter(&mut self, mut cb: SharedControlBlock) -> Result { @@ -343,10 +407,15 @@ impl Sender { }; pin_mut!(something_changed); match conditional_yield_until(something_changed, rtx_deadline).await { - Ok(()) => continue, + Ok(()) => match self.fin_seq_no { + Some(fin_seq_no) if self.send_unacked.get() > fin_seq_no => { + return Err(Fail::new(libc::ECONNRESET, "connection closed")); + }, + _ => continue, + }, Err(Fail { errno, cause: _ }) if errno == libc::ETIMEDOUT => { // Retransmit timeout. - + trace!("retransmit wake"); // Notify congestion control about RTO. // TODO: Is this the best place for this? // TODO: Why call into ControlBlock to get SND.UNA when congestion_control_on_rto() has access to it? @@ -373,33 +442,31 @@ impl Sender { /// Retransmits the earliest segment that has not (yet) been acknowledged by our peer. pub fn retransmit(&mut self, cb: &mut SharedControlBlock) { - if self.unacked_queue.is_empty() { - return; - } - let segment: &mut UnackedSegment = self - .unacked_queue - .get_front_mut() - .expect("just checked if queue is empty"); - - // We're retransmitting this, so we can no longer use an ACK for it as an RTT measurement (as we can't tell - // if the ACK is for the original or the retransmission). Remove the transmission timestamp from the entry. - segment.initial_tx.take(); - - // Clone the segment data for retransmission. - let data: DemiBuffer = segment.bytes.clone(); - - // TODO: Issue #198 Repacketization - we should send a full MSS (and set the FIN flag if applicable). - - // Prepare and send the segment. - let mut header: TcpHeader = cb.tcp_header(); - header.seq_num = self.send_unacked.get(); - if data.len() == 0 { - // This buffer is the end-of-send marker. Retransmit the FIN. - header.fin = true; - } else { - header.psh = true; + match self.unacked_queue.get_front_mut() { + Some(segment) => { + // We're retransmitting this, so we can no longer use an ACK for it as an RTT measurement (as we can't + // tell if the ACK is for the original or the retransmission). Remove the transmission timestamp from + // the entry. + segment.initial_tx.take(); + + // Clone the segment data for retransmission. + let data: Option = segment.bytes.as_ref().map(|b| b.clone()); + + // TODO: Issue #198 Repacketization - we should send a full MSS (and set the FIN flag if applicable). + + // Prepare and send the segment. + let mut header: TcpHeader = cb.tcp_header(); + header.seq_num = self.send_unacked.get(); + // If data exists, then this is a regular packet, otherwise, its a FIN. + if data.is_some() { + header.psh = true; + } else { + header.fin = true; + } + cb.emit(header, data); + }, + None => (), } - cb.emit(header, Some(data)); } // Process an ack. @@ -418,38 +485,13 @@ impl Sender { let mut bytes_remaining: usize = bytes_acknowledged as usize; // Remove bytes from the unacked queue. while bytes_remaining != 0 { - if let Some(mut segment) = self.unacked_queue.try_pop() { - // Add sample for RTO if we have an initial transmit time. - // Note that in the case of repacketization, an ack for the first byte is enough for the time sample because it still represents the RTO for that single byte. - // TODO: TCP timestamp support. - if let Some(initial_tx) = segment.initial_tx { - self.rto_calculator.add_sample(now - initial_tx); - } - - if segment.bytes.len() > bytes_remaining { - // Only some of the data in this segment has been acked. Remove just the acked amount. - expect_ok!( - segment.bytes.adjust(bytes_remaining), - "'segment' should contain at least 'bytes_remaining'" - ); - segment.initial_tx = None; - - // Leave this segment on the unacknowledged queue. - self.unacked_queue.push_front(segment); - break; - } - - if segment.bytes.len() == 0 { - // This buffer is the end-of-send marker. So we should only have one byte of acknowledged - // sequence space remaining (corresponding to our FIN). - debug_assert_eq!(bytes_remaining, 1); - bytes_remaining = 0; - } - - bytes_remaining -= segment.bytes.len(); - } else { - debug_assert!(false); // Shouldn't have bytes_remaining with no segments remaining in unacked_queue. - } + bytes_remaining = match self.unacked_queue.try_pop() { + Some(segment) if segment.bytes.is_none() => self.process_acked_fin(bytes_remaining, header.ack_num), + Some(segment) => self.process_acked_segment(bytes_remaining, segment, now), + None => { + unreachable!("There should be enough data in the unacked_queue for the number of bytes acked") + }, // Shouldn't have bytes_remaining with no segments remaining in unacked_queue. + }; } // Update SND.UNA to SEG.ACK. @@ -463,7 +505,7 @@ impl Sender { let retransmit_deadline_time_secs: Option = self.update_retransmit_deadline(now); #[cfg(debug_assertions)] if retransmit_deadline_time_secs.is_none() { - debug_assert_eq!(self.send_next.get(), header.ack_num); + debug_assert_eq!(self.send_next_seq_no.get(), header.ack_num); } self.retransmit_deadline_time_secs.set(retransmit_deadline_time_secs); } else { @@ -473,6 +515,50 @@ impl Sender { } } + fn process_acked_fin(&mut self, bytes_remaining: usize, ack_num: SeqNumber) -> usize { + // This buffer is the end-of-send marker. So we should only have one byte of acknowledged + // sequence space remaining (corresponding to our FIN). + debug_assert_eq!(bytes_remaining, 1); + + // Double check that the ack is for the FIN sequence number. + debug_assert_eq!( + ack_num, + self.fin_seq_no + .map(|s| { s + 1.into() }) + .expect("should have a FIN set") + ); + 0 + } + + fn process_acked_segment(&mut self, bytes_remaining: usize, mut segment: UnackedSegment, now: Instant) -> usize { + // Add sample for RTO if we have an initial transmit time. + // Note that in the case of repacketization, an ack for the first byte is enough for the time sample because it still represents the RTO for that single byte. + // TODO: TCP timestamp support. + if let Some(initial_tx) = segment.initial_tx { + self.rto_calculator.add_sample(now - initial_tx); + } + + let mut data: DemiBuffer = segment + .bytes + .take() + .expect("there should be data because this is not a FIN."); + if data.len() > bytes_remaining { + // Put this segment on the unacknowledged list. + let unacked_segment = UnackedSegment { + bytes: Some( + data.split_back(bytes_remaining) + .expect("Should be able to split back because we just checked the length"), + ), + initial_tx: None, + }; + // Leave this segment on the unacknowledged queue. + self.unacked_queue.push_front(unacked_segment); + 0 + } else { + bytes_remaining - data.len() + } + } + fn update_retransmit_deadline(&self, now: Instant) -> Option { match self.unacked_queue.get_front() { Some(UnackedSegment { @@ -514,7 +600,7 @@ impl Sender { // Get SND.NXT. pub fn get_next_seq_no(&self) -> SeqNumber { - self.send_next.get() + self.send_next_seq_no.get() } // Get the current estimate of RTO. diff --git a/src/rust/inetstack/protocols/layer4/tcp/socket.rs b/src/rust/inetstack/protocols/layer4/tcp/socket.rs index 6c76794ed..b53476e4a 100644 --- a/src/rust/inetstack/protocols/layer4/tcp/socket.rs +++ b/src/rust/inetstack/protocols/layer4/tcp/socket.rs @@ -224,11 +224,8 @@ impl SharedTcpSocket { // Send synchronously. match self.state { SocketState::Established(ref mut socket) => { - let size: usize = buf.len(); - // Send the packet. - socket.send(buf)?; // Wait for ack. - socket.push(size).await + socket.push(buf).await }, _ => unreachable!("State machine check should ensure that this socket is connected"), }