Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed bug: initial state of an epoll eid for listener socket not set #2444

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions apps/apputil.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "netinet_any.h"
#include "utilities.h"
#include "srt.h"

#if _WIN32

Expand Down Expand Up @@ -336,4 +337,41 @@ std::string OptionHelpItem(const OptionName& o);
const char* SRTClockTypeStr();
void PrintLibVersion();


namespace srt
{

struct OptionSetterProxy
{
SRTSOCKET s;
int result = -1;

OptionSetterProxy(SRTSOCKET ss): s(ss) {}

struct OptionProxy
{
const OptionSetterProxy& parent;
SRT_SOCKOPT opt;

template<class Type>
OptionProxy& operator=(Type&& val)
{
Type vc(val);
srt_setsockflag(parent.s, opt, &vc, sizeof vc);
return *this;
}
};

OptionProxy operator[](SRT_SOCKOPT opt) const
{
return OptionProxy {*this, opt};
}
};

inline OptionSetterProxy setopt(SRTSOCKET socket)
{
return OptionSetterProxy(socket);
}

}
#endif // INC_SRT_APPCOMMON_H
53 changes: 43 additions & 10 deletions configure-data.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,24 @@ proc preprocess {} {
}
}

proc GetCompilerCommand {} {
set compiler_map {
cc c++
gcc g++
}

proc GetCompilerCmdName {compiler lang} {
if {$lang == "c++"} {
if { [dict exists $::compiler_map $compiler] } {
return [dict get $::compiler_map $compiler]
}

return ${compiler}++
}

return $compiler
}

proc GetCompilerCommand { {lang {}} } {
# Expect that the compiler was set through:
# --with-compiler-prefix
# --cmake-c[++]-compiler
Expand All @@ -204,21 +221,25 @@ proc GetCompilerCommand {} {

if { [info exists ::optval(--with-compiler-prefix)] } {
set prefix $::optval(--with-compiler-prefix)
return ${prefix}$compiler
return ${prefix}[GetCompilerCmdName $compiler $lang]
} else {
return $compiler
return [GetCompilerCmdName $compiler $lang]
}

if { [info exists ::optval(--cmake-c-compiler)] } {
return $::optval(--cmake-c-compiler)
if { $lang != "c++" } {
if { [info exists ::optval(--cmake-c-compiler)] } {
return $::optval(--cmake-c-compiler)
}
}

if { [info exists ::optval(--cmake-c++-compiler)] } {
return $::optval(--cmake-c++-compiler)
}
if { $lang != "c" } {
if { [info exists ::optval(--cmake-c++-compiler)] } {
return $::optval(--cmake-c++-compiler)
}

if { [info exists ::optval(--cmake-cxx-compiler)] } {
return $::optval(--cmake-cxx-compiler)
if { [info exists ::optval(--cmake-cxx-compiler)] } {
return $::optval(--cmake-cxx-compiler)
}
}

puts "NOTE: Cannot obtain compiler, assuming toolchain file will do what's necessary"
Expand Down Expand Up @@ -284,6 +305,18 @@ proc postprocess {} {
} else {
puts "CONFIGURE: default compiler used"
}

# Complete the variables before calling cmake, otherwise it might not work

if { [info exists ::optval(--with-compiler-type)] } {
if { ![info exists ::optval(--cmake-c-compiler)] } {
lappend ::cmakeopt "-DCMAKE_C_COMPILER=[GetCompilerCommand c]"
}

if { ![info exists ::optval(--cmake-c++-compiler)] } {
lappend ::cmakeopt "-DCMAKE_CXX_COMPILER=[GetCompilerCommand c++]"
}
}
}

if { $::srt_name != "" } {
Expand Down
89 changes: 82 additions & 7 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,12 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
}
}

// This is set to true only if the group is to be reported
// from the accept call for the first time. Once it is extracted
// this way, this flag is cleared.
if (should_submit_to_accept)
g->m_bPending = true;

// Update the status in the group so that the next
// operation can include the socket in the group operation.
CUDTGroup::SocketData* gm = ns->m_GroupMemberData;
Expand Down Expand Up @@ -843,6 +849,69 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
return 1;
}

SRT_EPOLL_T srt::CUDTSocket::getListenerEvents()
{
// You need to check EVERY socket that has been queued
// and verify its internals. With independent socket the
// matter is simple - if it's present, you light up the
// SRT_EPOLL_ACCEPT flag.

#if !ENABLE_BONDING
//#if 1
Fixed Show fixed Hide fixed
ScopedLock accept_lock (m_AcceptLock);

// Make it simplified here - nonempty container = have acceptable sockets.
// Might make sometimes spurious acceptance, but this can also happen when
// the incoming accepted socket was suddenly broken.
return m_QueuedSockets.empty() ? 0 : int(SRT_EPOLL_ACCEPT);

#else // Could do #endif here, but the compiler would complain about unreachable code.

set<SRTSOCKET> sockets_copy;
{
ScopedLock accept_lock (m_AcceptLock);
sockets_copy = m_QueuedSockets;
}
return CUDT::uglobal().checkQueuedSocketsEvents(sockets_copy);

#endif
}

#if ENABLE_BONDING
int srt::CUDTUnited::checkQueuedSocketsEvents(const set<SRTSOCKET>& sockets)
{
SRT_EPOLL_T flags = 0;

// But with the member sockets an appropriate check must be
// done first: if this socket belongs to a group that is
// already in the connected state, you should light up the
// SRT_EPOLL_UPDATE flag instead. This flag is only for
// internal informing the waiters on the listening sockets
// that they should re-read the group list and re-check readiness.

// Now we can do lock once and for all
for (set<SRTSOCKET>::iterator i = sockets.begin(); i != sockets.end(); ++i)
{
CUDTSocket* s = locateSocket_LOCKED(*i);
if (!s)
continue; // wiped in the meantime - ignore

// If this pending socket is a group member, but the group
// to which it belongs is NOT waiting to be accepted, then
// light up the UPDATE event only. Light up ACCEPT only if
// this is a single socket, or this single socket has turned
// the mirror group to be first time available for accept(),
// and this accept() hasn't been done yet.
if (s->m_GroupOf && !s->m_GroupOf->groupPending())
flags |= SRT_EPOLL_UPDATE;
else
flags |= SRT_EPOLL_ACCEPT;
}

return flags;
}
#endif

// static forwarder
int srt::CUDT::installAcceptHook(SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq)
{
Expand Down Expand Up @@ -1164,6 +1233,7 @@ SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int
{
u = s->m_GroupOf->m_GroupID;
s->core().m_config.iGroupConnect = 1; // should be derived from ls, but make sure
s->m_GroupOf->m_bPending = false;

// Mark the beginning of the connection at the moment
// when the group ID is returned to the app caller
Expand Down Expand Up @@ -2366,14 +2436,19 @@ int srt::CUDTUnited::epoll_add_usock(const int eid, const SRTSOCKET u, const int
}
#endif

CUDTSocket* s = locateSocket(u);
if (s)
// The call to epoll_add_usock_INTERNAL is expected
// to be called under m_GlobControlLock, so use this lock here, too.
{
ret = epoll_add_usock_INTERNAL(eid, s, events);
}
else
{
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL);
ScopedLock cs (m_GlobControlLock);
CUDTSocket* s = locateSocket_LOCKED(u);
if (s)
{
ret = epoll_add_usock_INTERNAL(eid, s, events);
}
else
{
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL);
}
}

return ret;
Expand Down
7 changes: 7 additions & 0 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class CUDTSocket

unsigned int m_uiBackLog; //< maximum number of connections in queue

SRT_EPOLL_T getListenerEvents();

// XXX A refactoring might be needed here.

// There are no reasons found why the socket can't contain a list iterator to a
Expand Down Expand Up @@ -273,6 +275,11 @@ class CUDTUnited
int& w_error,
CUDT*& w_acpu);

#if ENABLE_BONDING
SRT_ATTR_REQUIRES(m_GlobControlLock)
int checkQueuedSocketsEvents(const std::set<SRTSOCKET>& sockets);
#endif

int installAcceptHook(const SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq);
int installConnectHook(const SRTSOCKET lsn, srt_connect_callback_fn* hook, void* opaq);

Expand Down
20 changes: 20 additions & 0 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11675,6 +11675,26 @@ void srt::CUDT::addEPoll(const int eid)
m_sPollID.insert(eid);
leaveCS(uglobal().m_EPoll.m_EPollLock);

//*
if (m_bListening)
{
// A listener socket can only get readiness on SRT_EPOLL_ACCEPT
// (which has the same value as SRT_EPOLL_IN), or sometimes
// also SRT_EPOLL_UPDATE. All interesting fields for that purpose
// are contained in the CUDTSocket class, so redirect there.
SRT_EPOLL_T events = m_parent->getListenerEvents();

// Only light up the events that were returned, do nothing if none is ready,
// the "no event" state is the default.
if (events)
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, events, true);

// You don't check anything else here - a listener socket can be only
// used for listening and nothing else.
return;
}
// */

if (!stillConnected())
return;

Expand Down
1 change: 1 addition & 0 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype)
, m_RcvBaseSeqNo(SRT_SEQNO_NONE)
, m_bOpened(false)
, m_bConnected(false)
, m_bPending(false)
, m_bClosing(false)
, m_iLastSchedSeqNo(SRT_SEQNO_NONE)
, m_iLastSchedMsgNo(SRT_MSGNO_NONE)
Expand Down
22 changes: 20 additions & 2 deletions srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ class CUDTGroup
return m_Group.empty();
}

bool groupPending()
{
return m_bPending;
}

void setGroupConnected();

int send(const char* buf, int len, SRT_MSGCTRL& w_mc);
Expand Down Expand Up @@ -659,8 +664,21 @@ class CUDTGroup
// from the first delivering socket will be taken as a good deal.
sync::atomic<int32_t> m_RcvBaseSeqNo;

bool m_bOpened; // Set to true when at least one link is at least pending
bool m_bConnected; // Set to true on first link confirmed connected
/// True: at least one socket has joined the group in at least pending state
bool m_bOpened;

/// True: at least one socket is connected, even if pending from the listener
bool m_bConnected;

/// True: this group was created on the listner side for the first socket
/// that is pending connection, so the group is about to be reported for the
/// srt_accept() call, but the application hasn't retrieved the group yet.
/// Not in use in case of caller-side groups.
// NOTE: using atomic in otder to allow this variable to be changed independently
// on any mutex locks.
sync::atomic<bool> m_bPending;

/// True: the group was requested to close and it should not allow any operations.
bool m_bClosing;

// There's no simple way of transforming config
Expand Down
Loading