Skip to content

Commit

Permalink
Added extra fixes for data races
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikołaj Małecki committed Feb 28, 2024
1 parent 0f0597a commit a82fd26
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 40 deletions.
53 changes: 21 additions & 32 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1905,7 +1905,7 @@ int srt::CUDTUnited::close(const SRTSOCKET u)
return 0;
}
#endif

#if ENABLE_HEAVY_LOGGING
// Wrapping the log into a destructor so that it
// is printed AFTER the destructor of SocketKeeper.
struct ForceDestructor
Expand All @@ -1916,14 +1916,15 @@ int srt::CUDTUnited::close(const SRTSOCKET u)
{
if (ps) // Could be not acquired by SocketKeeper, occasionally
{
LOGC(smlog.Note, log << "CUDTUnited::close/end: @" << ps->m_SocketID << " busy=" << ps->isStillBusy());
HLOGC(smlog.Debug, log << "CUDTUnited::close/end: @" << ps->m_SocketID << " busy=" << ps->isStillBusy());
}
}
} fod;
#endif

SocketKeeper k(*this, u, ERH_THROW);
fod.ps = k.socket;
LOGC(smlog.Note, log << "CUDTUnited::close/begin: @" << u << " busy=" << k.socket->isStillBusy());
IF_HEAVY_LOGGING(fod.ps = k.socket);
HLOGC(smlog.Debug, log << "CUDTUnited::close/begin: @" << u << " busy=" << k.socket->isStillBusy());
int ret = close(k.socket);

return ret;
Expand Down Expand Up @@ -2573,37 +2574,25 @@ srt::CUDTSocket* srt::CUDTUnited::locateAcquireSocket(SRTSOCKET u, ErrorHandling

bool srt::CUDTUnited::acquireSocket(CUDTSocket* s)
{
/*
// Note that before using this function you must be certain
// that the socket isn't broken already and it still has at least
// one more GC cycle to live. In other words, you must be certain
// that this pointer passed here isn't dangling and was obtained
// directly from m_Sockets, or even better, has been acquired
// by some other functionality already, which is only about to
// be released earlier than you need.
ScopedLock cg(m_GlobControlLock);
// JUST IN CASE, try to find the socket in m_Sockets (NOT in m_ClosedSockets).
// If it's not there, just pretend it's already deleted. The m_ClosedSockets
// is a container that keeps sockets that are being still in use of other threads,
// but were dispatched or acquired before another activity has requested them
// to be deleted.
// This uses simply a pointer value, so it should be safe even if the pointer
// was dangling. We state it's virtually impossible for a non-paused thread
// to get in a situation of having the reclaimed memory after a socket removed
// from m_Sockets to be reassigned to another new socket. Note that the socket
// being normally passed here as argument is a socket that was previously
// dispatched from somewhere else.
for (sockets_t::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++i)
s->apiAcquire();
// Keep the lock so that no one changes anything in the meantime.
// If the socket m_Status == SRTS_CLOSED (set by setClosed()), then
// this socket is no longer present in the m_Sockets container
if (s->m_Status >= SRTS_BROKEN)
{
CUDTSocket* is = i->second;
if (s == is)
{
*/
s->apiAcquire();
return true;
/*
}
s->apiRelease();
return false;
}
s->apiAcquire();
return false;
*/

return true;
}

srt::CUDTSocket* srt::CUDTUnited::locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn)
Expand Down
13 changes: 9 additions & 4 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7087,7 +7087,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_

do
{
if (stillConnected() && !timeout && !m_pRcvBuffer->isRcvDataReady(steady_clock::now()))
if (stillConnected() && !timeout && !isRcvBufferReady())
{
/* Kick TsbPd thread to schedule next wakeup (if running) */
if (m_bTsbPd)
Expand Down Expand Up @@ -8726,7 +8726,10 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr
// srt_recvfile (which doesn't make any sense), you'll have a deadlock.
if (m_config.bDriftTracer)
{
enterCS(m_RcvBufferLock);
const bool drift_updated SRT_ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt);
leaveCS(m_RcvBufferLock);

#if ENABLE_BONDING
if (drift_updated && m_parent->m_GroupOf)
{
Expand Down Expand Up @@ -9997,10 +10000,12 @@ int srt::CUDT::checkLazySpawnTsbPdThread()
{
const bool need_tsbpd = m_bTsbPd || m_bGroupTsbPd;

if (need_tsbpd && !m_RcvTsbPdThread.joinable())
{
ScopedLock lock(m_RcvTsbPdStartupLock);
if (!need_tsbpd)
return 0;

ScopedLock lock(m_RcvTsbPdStartupLock);
if (!m_RcvTsbPdThread.joinable())
{
if (m_bClosing) // Check again to protect join() in CUDT::releaseSync()
return -1;

Expand Down
7 changes: 3 additions & 4 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,7 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst

EReadStatus read_st = rst;
EConnectStatus conn_st = cst;
/*

CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id);
if (!sk.socket)
{
Expand All @@ -945,7 +945,7 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst
toRemove.push_back(*i);
continue;
}
// */


if (cst != CONN_RENDEZVOUS && dest_id != 0)
{
Expand Down Expand Up @@ -993,15 +993,14 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst
{
HLOGC(cnlog.Debug, log << "updateConnStatus: COMPLETING dep objects update on failed @" << i->id);
remove(i->id);
/*

CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id);
if (!sk.socket)
{
// This actually shall never happen, so it's a kind of paranoid check.
LOGC(cnlog.Error, log << "updateConnStatus: IPE: socket @" << i->id << " already closed, NOT ACCESSING its contents");
continue;
}
// */

// Setting m_bConnecting to false, and need to remove the socket from the rendezvous queue
// because the next CUDT::close will not remove it from the queue when m_bConnecting = false,
Expand Down

0 comments on commit a82fd26

Please sign in to comment.