diff --git a/srtcore/buffer_rcv.h b/srtcore/buffer_rcv.h index 3c60be21db..f783ac2a27 100644 --- a/srtcore/buffer_rcv.h +++ b/srtcore/buffer_rcv.h @@ -200,6 +200,20 @@ class CRcvBuffer return (m_iMaxPosOff == 0); } + /// Returns the currently used number of cells, including + /// gaps with empty cells, or in other words, the distance + /// between the initial position and the youngest received packet. + size_t size() const + { + return m_iMaxPosOff; + } + + // Returns true if the buffer is full. Requires locking. + bool full() const + { + return size() == capacity(); + } + /// Return buffer capacity. /// One slot had to be empty in order to tell the difference between "empty buffer" and "full buffer". /// E.g. m_iFirstNonreadPos would again point to m_iStartPos if m_szSize entries are added continiously. @@ -333,9 +347,8 @@ class CRcvBuffer EntryStatus status; }; - //static Entry emptyEntry() { return Entry { NULL, EntryState_Empty }; } - - FixedArray m_entries; + typedef FixedArray entries_t; + entries_t m_entries; const size_t m_szSize; // size of the array of units (buffer) CUnitQueue* m_pUnitQueue; // the shared unit queue diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 441e92369c..bc8bcd74c6 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -298,7 +298,7 @@ void srt::CUDT::construct() m_iPeerTsbPdDelay_ms = 0; m_bPeerTsbPd = false; m_bTsbPd = false; - m_bTsbPdAckWakeup = false; + m_bTsbPdNeedsWakeup = false; m_bGroupTsbPd = false; m_bPeerTLPktDrop = false; m_bBufferWasFull = false; @@ -5375,7 +5375,7 @@ void * srt::CUDT::tsbpd(void* param) CUniqueSync recvdata_lcc (self->m_RecvLock, self->m_RecvDataCond); CSync tsbpd_cc(self->m_RcvTsbPdCond, recvdata_lcc.locker()); - self->m_bTsbPdAckWakeup = true; + self->m_bTsbPdNeedsWakeup = true; while (!self->m_bClosing) { steady_clock::time_point tsNextDelivery; // Next packet delivery time @@ -5395,6 +5395,21 @@ void * srt::CUDT::tsbpd(void* param) const bool is_time_to_deliver = !is_zero(info.tsbpd_time) && (tnow >= info.tsbpd_time); tsNextDelivery = info.tsbpd_time; +#if ENABLE_HEAVY_LOGGING + if (info.seqno == SRT_SEQNO_NONE) + { + HLOGC(tslog.Debug, log << self->CONID() << "sok/tsbpd: packet check: NO PACKETS"); + } + else + { + HLOGC(tslog.Debug, log << self->CONID() << "sok/tsbpd: packet check: %" + << info.seqno << " T=" << FormatTime(tsNextDelivery) + << " diff-now-playtime=" << FormatDuration(tnow - tsNextDelivery) + << " ready=" << is_time_to_deliver + << " ondrop=" << info.seq_gap); + } +#endif + if (!self->m_bTLPktDrop) { rxready = !info.seq_gap && is_time_to_deliver; @@ -5430,8 +5445,8 @@ void * srt::CUDT::tsbpd(void* param) if (rxready) { HLOGC(tslog.Debug, - log << self->CONID() << "tsbpd: PLAYING PACKET seq=" << info.seqno << " (belated " - << (count_milliseconds(steady_clock::now() - info.tsbpd_time)) << "ms)"); + log << self->CONID() << "tsbpd: PLAYING PACKET seq=" << info.seqno << " (belated " + << FormatDuration(steady_clock::now() - info.tsbpd_time) << ")"); /* * There are packets ready to be delivered * signal a waiting "recv" call if there is any data available @@ -5494,6 +5509,8 @@ void * srt::CUDT::tsbpd(void* param) if (self->m_bClosing) break; + SRT_ATR_UNUSED bool bWokeUpOnSignal = true; + if (!is_zero(tsNextDelivery)) { IF_HEAVY_LOGGING(const steady_clock::duration timediff = tsNextDelivery - tnow); @@ -5501,12 +5518,12 @@ void * srt::CUDT::tsbpd(void* param) * Buffer at head of queue is not ready to play. * Schedule wakeup when it will be. */ - self->m_bTsbPdAckWakeup = false; + self->m_bTsbPdNeedsWakeup = false; HLOGC(tslog.Debug, - log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << info.seqno - << " T=" << FormatTime(tsNextDelivery) << " - waiting " << count_milliseconds(timediff) << "ms"); + log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << info.seqno + << " T=" << FormatTime(tsNextDelivery) << " - waiting " << FormatDuration(timediff)); THREAD_PAUSED(); - tsbpd_cc.wait_until(tsNextDelivery); + bWokeUpOnSignal = tsbpd_cc.wait_until(tsNextDelivery); THREAD_RESUMED(); } else @@ -5523,13 +5540,15 @@ void * srt::CUDT::tsbpd(void* param) * - Closing the connection */ HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: no data, scheduling wakeup at ack"); - self->m_bTsbPdAckWakeup = true; + self->m_bTsbPdNeedsWakeup = true; THREAD_PAUSED(); tsbpd_cc.wait(); THREAD_RESUMED(); } - HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP!!!"); + HLOGC(tslog.Debug, + log << self->CONID() << "tsbpd: WAKE UP [" << (bWokeUpOnSignal ? "signal" : "timeout") << "]!!! - " + << "NOW=" << FormatTime(steady_clock::now())); } THREAD_EXIT(); HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING"); @@ -6868,6 +6887,12 @@ bool srt::CUDT::isRcvBufferReadyNoLock() const return m_pRcvBuffer->isRcvDataReady(steady_clock::now()); } +bool srt::CUDT::isRcvBufferFull() const +{ + ScopedLock lck(m_RcvBufferLock); + return m_pRcvBuffer->full(); +} + // int by_exception: accepts values of CUDTUnited::ErrorHandling: // - 0 - by return value // - 1 - by exception @@ -7649,12 +7674,13 @@ bool srt::CUDT::updateCC(ETransmissionEvent evt, const EventVariant arg) // - m_dPktSndPeriod // - m_dCWndSize m_tdSendInterval = microseconds_from((int64_t)m_CongCtl->pktSndPeriod_us()); - m_dCongestionWindow = m_CongCtl->cgWindowSize(); + const double cgwindow = m_CongCtl->cgWindowSize(); + m_iCongestionWindow = cgwindow; #if ENABLE_HEAVY_LOGGING HLOGC(rslog.Debug, - log << CONID() << "updateCC: updated values from congctl: interval=" << count_microseconds(m_tdSendInterval) << " us (" - << "tk (" << m_CongCtl->pktSndPeriod_us() << "us) cgwindow=" - << std::setprecision(3) << m_dCongestionWindow); + log << CONID() << "updateCC: updated values from congctl: interval=" << FormatDuration(m_tdSendInterval) + << " (cfg:" << m_CongCtl->pktSndPeriod_us() << "us) cgwindow=" + << std::setprecision(3) << cgwindow); #endif } @@ -8055,7 +8081,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) /* Newly acknowledged data, signal TsbPD thread */ CUniqueSync tslcc (m_RecvLock, m_RcvTsbPdCond); // m_bTsbPdAckWakeup is protected by m_RecvLock in the tsbpd() thread - if (m_bTsbPdAckWakeup) + if (m_bTsbPdNeedsWakeup) tslcc.notify_one(); } else @@ -8118,7 +8144,8 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) else if (!bNeedFullAck) { // Not possible (m_iRcvCurrSeqNo+1 <% m_iRcvLastAck ?) - LOGC(xtlog.Error, log << CONID() << "sendCtrl(UMSG_ACK): IPE: curr %" << ack << " <% last %" << m_iRcvLastAck); + LOGC(xtlog.Error, log << CONID() << "sendCtrl(UMSG_ACK): IPE: curr(" << reason << ") %" + << ack << " <% last %" << m_iRcvLastAck); return nbsent; } diff --git a/srtcore/core.h b/srtcore/core.h index eb1edffc93..d865a48e15 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -323,6 +323,7 @@ class CUDT #endif int32_t rcvSeqNo() const { return m_iRcvCurrSeqNo; } + SRT_ATTR_REQUIRES(m_RecvAckLock) int flowWindowSize() const { return m_iFlowWindowSize; } int32_t deliveryRate() const { return m_iDeliveryRate; } int bandwidth() const { return m_iBandwidth; } @@ -370,6 +371,7 @@ class CUDT /// Returns the number of packets in flight (sent, but not yet acknowledged). /// @returns The number of packets in flight belonging to the interval [0; ...) + SRT_ATTR_REQUIRES(m_RecvAckLock) int32_t getFlightSpan() const { return getFlightSpan(m_iSndLastAck, m_iSndCurrSeqNo); @@ -674,6 +676,8 @@ class CUDT /// the receiver fresh loss list. void unlose(const CPacket& oldpacket); void dropFromLossLists(int32_t from, int32_t to); + + SRT_ATTR_REQUIRES(m_RecvAckLock) bool getFirstNoncontSequence(int32_t& w_seq, std::string& w_log_reason); SRT_ATTR_EXCLUDES(m_ConnectionLock) @@ -729,6 +733,9 @@ class CUDT SRT_ATTR_REQUIRES(m_RcvBufferLock) bool isRcvBufferReadyNoLock() const; + SRT_ATTR_EXCLUDES(m_RcvBufferLock) + bool isRcvBufferFull() const; + // TSBPD thread main function. static void* tsbpd(void* param); @@ -964,7 +971,7 @@ class CUDT sync::CThread m_RcvTsbPdThread; // Rcv TsbPD Thread handle sync::Condition m_RcvTsbPdCond; // TSBPD signals if reading is ready. Use together with m_RecvLock - bool m_bTsbPdAckWakeup; // Signal TsbPd thread on Ack sent + bool m_bTsbPdNeedsWakeup; // Signal TsbPd thread to wake up on RCV buffer state change. sync::Mutex m_RcvTsbPdStartupLock; // Protects TSBPD thread creating and joining CallbackHolder m_cbAcceptHook; @@ -1107,7 +1114,8 @@ class CUDT /// @return -2 The incoming packet exceeds the expected sequence by more than a length of the buffer (irrepairable discrepancy). int handleSocketPacketReception(const std::vector& incoming, bool& w_new_inserted, bool& w_was_sent_in_order, CUDT::loss_seqs_t& w_srt_loss_seqs); - /// Get the packet's TSBPD time. + /// Get the packet's TSBPD time - + /// the time when it is passed to the reading application. /// The @a grp passed by void* is not used yet /// and shall not be used when ENABLE_BONDING=0. time_point getPktTsbPdTime(void* grp, const CPacket& packet); diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 3216262485..23c44fc013 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -1092,8 +1092,8 @@ bool srt::CRendezvousQueue::qualifyToHandle(EReadStatus rst, if ((rst == RST_AGAIN || i->m_iID != iDstSockID) && tsNow <= tsRepeat) { HLOGC(cnlog.Debug, - log << "RID:@" << i->m_iID << std::fixed << count_microseconds(tsNow - tsLastReq) / 1000.0 - << " ms passed since last connection request."); + log << "RID:@" << i->m_iID << " " << FormatDuration(tsNow - tsLastReq) + << " passed since last connection request."); continue; }