From 7923aae10e274d1c64258b0f71e9e951b11aca7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 25 Aug 2022 10:45:13 +0200 Subject: [PATCH 01/15] Added a UT that confirms the bug. Added an option setter utility. --- apps/apputil.hpp | 37 ++++++++++++++++++++++- test/test_epoll.cpp | 72 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 107 insertions(+), 2 deletions(-) diff --git a/apps/apputil.hpp b/apps/apputil.hpp index acb28d076..c6e013f02 100644 --- a/apps/apputil.hpp +++ b/apps/apputil.hpp @@ -20,6 +20,7 @@ #include "netinet_any.h" #include "utilities.h" +#include "srt.h" #if _WIN32 @@ -332,8 +333,42 @@ inline bool OptionPresent(const options_t& options, const std::set& options_t ProcessOptions(char* const* argv, int argc, std::vector scheme); std::string OptionHelpItem(const OptionName& o); - const char* SRTClockTypeStr(); void PrintLibVersion(); + +namespace srt +{ + +struct OptionSetterProxy +{ + SRTSOCKET s = -1; + int result = 0; + + struct OptionProxy + { + OptionSetterProxy& parent; + SRT_SOCKOPT opt; + + template + OptionProxy& operator=(Type&& val) + { + Type vc(val); + srt_setsockflag(parent.s, opt, &vc, sizeof vc); + return *this; + } + }; + + OptionProxy operator[](SRT_SOCKOPT opt) + { + return OptionProxy {*this, opt}; + } +}; + +inline OptionSetterProxy setopt(SRTSOCKET socket) +{ + return OptionSetterProxy {socket}; +} + +} #endif // INC_SRT_APPCOMMON_H diff --git a/test/test_epoll.cpp b/test/test_epoll.cpp index d5e044d7e..497bce692 100644 --- a/test/test_epoll.cpp +++ b/test/test_epoll.cpp @@ -6,7 +6,7 @@ #include "gtest/gtest.h" #include "api.h" #include "epoll.h" - +#include "apputil.hpp" using namespace std; using namespace srt; @@ -561,6 +561,76 @@ TEST(CEPoll, ThreadedUpdate) EXPECT_EQ(srt_cleanup(), 0); } +TEST(CEPoll, LateListenerReady) +{ + int server_sock = srt_create_socket(), caller_sock = srt_create_socket(); + + sockaddr_in sa; + memset(&sa, 0, sizeof sa); + sa.sin_family = AF_INET; + sa.sin_port = htons(5555); + ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1); + + srt_bind(server_sock, (sockaddr*)& sa, sizeof(sa)); + srt_listen(server_sock, 1); + + srt::setopt(server_sock)[SRTO_RCVSYN] = false; + + // Ok, the listener socket is ready; now make a call, but + // do not do anything on the listener socket yet. + +// This macro is to manipulate with the moment when the call is made +// towards the eid subscription. If 1, then the call is made first, +// and then subsciption after a 1s time. Set it to 0 to see how it +// works when the subscription is made first, so the readiness is from +// the listener changing the state. +#define LATE_CALL 1 + +#if LATE_CALL + + // We don't need the caller to be async, it can hang up here. + auto connect_res = std::async(std::launch::async, [&caller_sock, &sa]() { + return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); + }); + +#endif + this_thread::sleep_for(chrono::milliseconds(1000)); + + // What is important is that the accepted socket is now reporting in + // on the listener socket. So let's create an epoll. + + int eid = srt_epoll_create(); + + // and add this listener to it + int modes = SRT_EPOLL_IN; + EXPECT_NE(srt_epoll_add_usock(eid, server_sock, &modes), SRT_ERROR); + +#if !LATE_CALL + + // We don't need the caller to be async, it can hang up here. + auto connect_res = std::async(std::launch::async, [&caller_sock, &sa]() { + return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); + }); + +#endif + + // And see now if the waiting accepted socket reports it. + SRT_EPOLL_EVENT fdset[1]; + EXPECT_EQ(srt_epoll_uwait(eid, fdset, 1, 5000), 1); + + sockaddr_in scl; + int sclen = sizeof scl; + SRTSOCKET sock = srt_accept(server_sock, (sockaddr*)& scl, &sclen); + EXPECT_NE(sock, SRT_INVALID_SOCK); + + EXPECT_EQ(connect_res.get(), SRT_SUCCESS); + + srt_epoll_release(eid); + srt_close(sock); + srt_close(server_sock); + srt_close(caller_sock); +} + class TestEPoll: public testing::Test { From 6cf64f5944408311fd0930f14dff6dbd87995cf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 25 Aug 2022 14:55:51 +0200 Subject: [PATCH 02/15] Fixed clang errors. Added compiler flags to configure to prevent changes by google test --- apps/apputil.hpp | 13 +++++++----- configure-data.tcl | 53 +++++++++++++++++++++++++++++++++++++--------- 2 files changed, 51 insertions(+), 15 deletions(-) diff --git a/apps/apputil.hpp b/apps/apputil.hpp index c6e013f02..c79cf984e 100644 --- a/apps/apputil.hpp +++ b/apps/apputil.hpp @@ -333,6 +333,7 @@ inline bool OptionPresent(const options_t& options, const std::set& options_t ProcessOptions(char* const* argv, int argc, std::vector scheme); std::string OptionHelpItem(const OptionName& o); + const char* SRTClockTypeStr(); void PrintLibVersion(); @@ -342,12 +343,14 @@ namespace srt struct OptionSetterProxy { - SRTSOCKET s = -1; - int result = 0; + SRTSOCKET s; + int result = -1; + + OptionSetterProxy(SRTSOCKET ss): s(ss) {} struct OptionProxy { - OptionSetterProxy& parent; + const OptionSetterProxy& parent; SRT_SOCKOPT opt; template @@ -359,7 +362,7 @@ struct OptionSetterProxy } }; - OptionProxy operator[](SRT_SOCKOPT opt) + OptionProxy operator[](SRT_SOCKOPT opt) const { return OptionProxy {*this, opt}; } @@ -367,7 +370,7 @@ struct OptionSetterProxy inline OptionSetterProxy setopt(SRTSOCKET socket) { - return OptionSetterProxy {socket}; + return OptionSetterProxy(socket); } } diff --git a/configure-data.tcl b/configure-data.tcl index eace99da1..40500a721 100644 --- a/configure-data.tcl +++ b/configure-data.tcl @@ -190,7 +190,24 @@ proc preprocess {} { } } -proc GetCompilerCommand {} { +set compiler_map { + cc c++ + gcc g++ +} + +proc GetCompilerCmdName {compiler lang} { + if {$lang == "c++"} { + if { [dict exists $::compiler_map $compiler] } { + return [dict get $::compiler_map $compiler] + } + + return ${compiler}++ + } + + return $compiler +} + +proc GetCompilerCommand { {lang {}} } { # Expect that the compiler was set through: # --with-compiler-prefix # --cmake-c[++]-compiler @@ -203,21 +220,25 @@ proc GetCompilerCommand {} { if { [info exists ::optval(--with-compiler-prefix)] } { set prefix $::optval(--with-compiler-prefix) - return ${prefix}$compiler + return ${prefix}[GetCompilerCmdName $compiler $lang] } else { - return $compiler + return [GetCompilerCmdName $compiler $lang] } - if { [info exists ::optval(--cmake-c-compiler)] } { - return $::optval(--cmake-c-compiler) + if { $lang != "c++" } { + if { [info exists ::optval(--cmake-c-compiler)] } { + return $::optval(--cmake-c-compiler) + } } - if { [info exists ::optval(--cmake-c++-compiler)] } { - return $::optval(--cmake-c++-compiler) - } + if { $lang != "c" } { + if { [info exists ::optval(--cmake-c++-compiler)] } { + return $::optval(--cmake-c++-compiler) + } - if { [info exists ::optval(--cmake-cxx-compiler)] } { - return $::optval(--cmake-cxx-compiler) + if { [info exists ::optval(--cmake-cxx-compiler)] } { + return $::optval(--cmake-cxx-compiler) + } } puts "NOTE: Cannot obtain compiler, assuming toolchain file will do what's necessary" @@ -283,6 +304,18 @@ proc postprocess {} { } else { puts "CONFIGURE: default compiler used" } + + # Complete the variables before calling cmake, otherwise it might not work + + if { [info exists ::optval(--with-compiler-type)] } { + if { ![info exists ::optval(--cmake-c-compiler)] } { + lappend ::cmakeopt "-DCMAKE_C_COMPILER=[GetCompilerCommand c]" + } + + if { ![info exists ::optval(--cmake-c++-compiler)] } { + lappend ::cmakeopt "-DCMAKE_CXX_COMPILER=[GetCompilerCommand c++]" + } + } } if { $::srt_name != "" } { From 068e8a9b3de03296be6222b8788dd53ed8f62542 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Fri, 26 Aug 2022 13:27:19 +0200 Subject: [PATCH 03/15] Added checking of accept candidates. In-group check disabled and left dead code (this solution deadlocks) --- srtcore/api.cpp | 58 +++++++++++++++++++++++++++++++++++++++++++++ srtcore/api.h | 4 ++++ srtcore/core.cpp | 20 ++++++++++++++++ srtcore/group.h | 5 ++++ test/test_epoll.cpp | 4 ++++ 5 files changed, 91 insertions(+) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index d575cd968..642067a3f 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -835,6 +835,64 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, return 1; } +SRT_EPOLL_T srt::CUDTSocket::getListenerEvents() +{ + // You need to check EVERY socket that has been queued + // and verify its internals. With independent socket the + // matter is simple - if it's present, you light up the + // SRT_EPOLL_ACCEPT flag. + +//#if !ENABLE_BONDING +#if 1 + ScopedLock accept_lock (m_AcceptLock); + + // Make it simplified here - nonempty container = have acceptable sockets. + // Might make sometimes spurious acceptance, but this can also happen when + // the incoming accepted socket was suddenly broken. + return m_QueuedSockets.empty() ? 0 : int(SRT_EPOLL_ACCEPT); + +#else // Could do #endif here, but the compiler would complain about unreachable code. + + set sockets_copy; + { + ScopedLock accept_lock (m_AcceptLock); + sockets_copy = m_QueuedSockets; + } + return CUDT::uglobal().checkQueuedSocketsEvents(sockets_copy); + +#endif +} + +#if ENABLE_BONDING +int srt::CUDTUnited::checkQueuedSocketsEvents(const set& sockets) +{ + SRT_EPOLL_T flags = 0; + + // But with the member sockets an appropriate check must be + // done first: if this socket belongs to a group that is + // already in the connected state, you should light up the + // SRT_EPOLL_UPDATE flag instead. This flag is only for + // internal informing the waiters on the listening sockets + // that they should re-read the group list and re-check readiness. + + // Now we can do lock once and for all + //ScopedLock glk (m_GlobControlLock); // XXX DEADLOCKS + + for (set::iterator i = sockets.begin(); i != sockets.end(); ++i) + { + CUDTSocket* s = locateSocket_LOCKED(*i); + if (!s) + continue; // wiped in the meantime - ignore + if (s->m_GroupOf && s->m_GroupOf->groupConnected()) + flags |= SRT_EPOLL_UPDATE; + else + flags |= SRT_EPOLL_ACCEPT; + } + + return flags; +} +#endif + // static forwarder int srt::CUDT::installAcceptHook(SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq) { diff --git a/srtcore/api.h b/srtcore/api.h index ca812bf9e..6254812a4 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -158,6 +158,8 @@ class CUDTSocket unsigned int m_uiBackLog; //< maximum number of connections in queue + SRT_EPOLL_T getListenerEvents(); + // XXX A refactoring might be needed here. // There are no reasons found why the socket can't contain a list iterator to a @@ -264,6 +266,8 @@ class CUDTUnited int& w_error, CUDT*& w_acpu); + int checkQueuedSocketsEvents(const std::set& sockets); + int installAcceptHook(const SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq); int installConnectHook(const SRTSOCKET lsn, srt_connect_callback_fn* hook, void* opaq); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 93224f418..18b9c013c 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -11515,6 +11515,26 @@ void srt::CUDT::addEPoll(const int eid) m_sPollID.insert(eid); leaveCS(uglobal().m_EPoll.m_EPollLock); + //* + if (m_bListening) + { + // A listener socket can only get readiness on SRT_EPOLL_ACCEPT + // (which has the same value as SRT_EPOLL_IN), or sometimes + // also SRT_EPOLL_UPDATE. All interesting fields for that purpose + // are contained in the CUDTSocket class, so redirect there. + SRT_EPOLL_T events = m_parent->getListenerEvents(); + + // Only light up the events that were returned, do nothing if none is ready, + // the "no event" state is the default. + if (events) + uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, events, true); + + // You don't check anything else here - a listener socket can be only + // used for listening and nothing else. + return; + } + // */ + if (!stillConnected()) return; diff --git a/srtcore/group.h b/srtcore/group.h index 1bd84aeda..c5f147f23 100644 --- a/srtcore/group.h +++ b/srtcore/group.h @@ -206,6 +206,11 @@ class CUDTGroup return m_Group.empty(); } + bool groupConnected() + { + return m_bConnected; + } + void setGroupConnected(); int send(const char* buf, int len, SRT_MSGCTRL& w_mc); diff --git a/test/test_epoll.cpp b/test/test_epoll.cpp index 497bce692..47b488b88 100644 --- a/test/test_epoll.cpp +++ b/test/test_epoll.cpp @@ -563,6 +563,8 @@ TEST(CEPoll, ThreadedUpdate) TEST(CEPoll, LateListenerReady) { + ASSERT_EQ(srt_startup(), 0); + int server_sock = srt_create_socket(), caller_sock = srt_create_socket(); sockaddr_in sa; @@ -629,6 +631,8 @@ TEST(CEPoll, LateListenerReady) srt_close(sock); srt_close(server_sock); srt_close(caller_sock); + + EXPECT_EQ(srt_cleanup(), 0); } From 6827df78cc22ccb8ea2ce75b2b45290e00488ff1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 11 Oct 2023 15:11:02 +0200 Subject: [PATCH 04/15] Added tests for more cases and fixed remaining failures. STILL mutex not enabled. --- srtcore/api.cpp | 20 ++++++- srtcore/group.cpp | 1 + srtcore/group.h | 21 +++++-- test/test_epoll.cpp | 135 +++++++++++++++++++++++++++++++++++--------- 4 files changed, 142 insertions(+), 35 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 9f592a07a..a1e1abb14 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -697,6 +697,12 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, } } + // This is set to true only if the group is to be reported + // from the accept call for the first time. Once it is extracted + // this way, this flag is cleared. + if (should_submit_to_accept) + g->m_bPending = true; + // Update the status in the group so that the next // operation can include the socket in the group operation. CUDTGroup::SocketData* gm = ns->m_GroupMemberData; @@ -850,8 +856,8 @@ SRT_EPOLL_T srt::CUDTSocket::getListenerEvents() // matter is simple - if it's present, you light up the // SRT_EPOLL_ACCEPT flag. -//#if !ENABLE_BONDING -#if 1 +#if !ENABLE_BONDING +//#if 1 ScopedLock accept_lock (m_AcceptLock); // Make it simplified here - nonempty container = have acceptable sockets. @@ -891,7 +897,14 @@ int srt::CUDTUnited::checkQueuedSocketsEvents(const set& sockets) CUDTSocket* s = locateSocket_LOCKED(*i); if (!s) continue; // wiped in the meantime - ignore - if (s->m_GroupOf && s->m_GroupOf->groupConnected()) + + // If this pending socket is a group member, but the group + // to which it belongs is NOT waiting to be accepted, then + // light up the UPDATE event only. Light up ACCEPT only if + // this is a single socket, or this single socket has turned + // the mirror group to be first time available for accept(), + // and this accept() hasn't been done yet. + if (s->m_GroupOf && !s->m_GroupOf->groupPending()) flags |= SRT_EPOLL_UPDATE; else flags |= SRT_EPOLL_ACCEPT; @@ -1222,6 +1235,7 @@ SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int { u = s->m_GroupOf->m_GroupID; s->core().m_config.iGroupConnect = 1; // should be derived from ls, but make sure + s->m_GroupOf->m_bPending = false; // Mark the beginning of the connection at the moment // when the group ID is returned to the app caller diff --git a/srtcore/group.cpp b/srtcore/group.cpp index c6b92a0ff..d70a32ba4 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -273,6 +273,7 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype) , m_RcvBaseSeqNo(SRT_SEQNO_NONE) , m_bOpened(false) , m_bConnected(false) + , m_bPending(false) , m_bClosing(false) , m_iLastSchedSeqNo(SRT_SEQNO_NONE) , m_iLastSchedMsgNo(SRT_MSGNO_NONE) diff --git a/srtcore/group.h b/srtcore/group.h index d31d9e610..90dcace16 100644 --- a/srtcore/group.h +++ b/srtcore/group.h @@ -203,9 +203,9 @@ class CUDTGroup return m_Group.empty(); } - bool groupConnected() + bool groupPending() { - return m_bConnected; + return m_bPending; } void setGroupConnected(); @@ -664,8 +664,21 @@ class CUDTGroup // from the first delivering socket will be taken as a good deal. sync::atomic m_RcvBaseSeqNo; - bool m_bOpened; // Set to true when at least one link is at least pending - bool m_bConnected; // Set to true on first link confirmed connected + /// True: at least one socket has joined the group in at least pending state + bool m_bOpened; + + /// True: at least one socket is connected, even if pending from the listener + bool m_bConnected; + + /// True: this group was created on the listner side for the first socket + /// that is pending connection, so the group is about to be reported for the + /// srt_accept() call, but the application hasn't retrieved the group yet. + /// Not in use in case of caller-side groups. + // NOTE: using atomic in otder to allow this variable to be changed independently + // on any mutex locks. + sync::atomic m_bPending; + + /// True: the group was requested to close and it should not allow any operations. bool m_bClosing; // There's no simple way of transforming config diff --git a/test/test_epoll.cpp b/test/test_epoll.cpp index 99244642a..46cb10dce 100644 --- a/test/test_epoll.cpp +++ b/test/test_epoll.cpp @@ -545,80 +545,159 @@ TEST(CEPoll, ThreadedUpdate) } } -TEST(CEPoll, LateListenerReady) +void testListenerReady(const bool LATE_CALL, size_t nmembers) { - ASSERT_EQ(srt_startup(), 0); - - int server_sock = srt_create_socket(), caller_sock = srt_create_socket(); - sockaddr_in sa; memset(&sa, 0, sizeof sa); sa.sin_family = AF_INET; sa.sin_port = htons(5555); ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1); + TestInit init; + + SRTSOCKET server_sock, caller_sock; + server_sock = srt_create_socket(); + + if (nmembers > 0) + { + caller_sock = srt_create_group(SRT_GTYPE_BROADCAST); + int on = 1; + EXPECT_NE(srt_setsockflag(server_sock, SRTO_GROUPCONNECT, &on, sizeof on), SRT_ERROR); + } + else + { + caller_sock = srt_create_socket(); + nmembers = 1; // Set to 1 so that caller starts at least once. + } + srt_bind(server_sock, (sockaddr*)& sa, sizeof(sa)); - srt_listen(server_sock, 1); + srt_listen(server_sock, nmembers+1); srt::setopt(server_sock)[SRTO_RCVSYN] = false; // Ok, the listener socket is ready; now make a call, but // do not do anything on the listener socket yet. -// This macro is to manipulate with the moment when the call is made -// towards the eid subscription. If 1, then the call is made first, -// and then subsciption after a 1s time. Set it to 0 to see how it -// works when the subscription is made first, so the readiness is from -// the listener changing the state. -#define LATE_CALL 1 + std::cout << "Using " << (LATE_CALL ? "LATE" : "EARLY") << " call\n"; -#if LATE_CALL + std::vector> connect_res; - // We don't need the caller to be async, it can hang up here. - auto connect_res = std::async(std::launch::async, [&caller_sock, &sa]() { - return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); - }); + if (LATE_CALL) + { + // We don't need the caller to be async, it can hang up here. + for (size_t i = 0; i < nmembers; ++i) + { + connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa]() { + return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); + })); + } -#endif + std::cout << "STARTED connecting...\n"; + } + + std::cout << "Sleeping 1s...\n"; this_thread::sleep_for(chrono::milliseconds(1000)); // What is important is that the accepted socket is now reporting in // on the listener socket. So let's create an epoll. int eid = srt_epoll_create(); + int eid_postcheck = srt_epoll_create(); // and add this listener to it int modes = SRT_EPOLL_IN; + int modes_postcheck = SRT_EPOLL_IN | SRT_EPOLL_UPDATE; EXPECT_NE(srt_epoll_add_usock(eid, server_sock, &modes), SRT_ERROR); + EXPECT_NE(srt_epoll_add_usock(eid_postcheck, server_sock, &modes_postcheck), SRT_ERROR); -#if !LATE_CALL - - // We don't need the caller to be async, it can hang up here. - auto connect_res = std::async(std::launch::async, [&caller_sock, &sa]() { - return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); - }); + if (!LATE_CALL) + { + // We don't need the caller to be async, it can hang up here. + for (size_t i = 0; i < nmembers; ++i) + { + connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa]() { + return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); + })); + } -#endif + std::cout << "STARTED connecting...\n"; + } + std::cout << "Waiting for readiness...\n"; // And see now if the waiting accepted socket reports it. SRT_EPOLL_EVENT fdset[1]; EXPECT_EQ(srt_epoll_uwait(eid, fdset, 1, 5000), 1); + std::cout << "Accepting...\n"; sockaddr_in scl; int sclen = sizeof scl; SRTSOCKET sock = srt_accept(server_sock, (sockaddr*)& scl, &sclen); EXPECT_NE(sock, SRT_INVALID_SOCK); - EXPECT_EQ(connect_res.get(), SRT_SUCCESS); + if (nmembers > 1) + { + std::cout << "With >1 members, check if there's still UPDATE pending\n"; + // Spawn yet another connection within the group, just to get the update + auto extra_call = std::async(std::launch::async, [&caller_sock, &sa]() { + return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); + }); + // For 2+ members, additionally check if there AREN'T any + // further acceptance members, but there are UPDATEs. + EXPECT_EQ(srt_epoll_uwait(eid_postcheck, fdset, 1, 5000), 1); + + // SUBSCRIBED EVENTS: IN, UPDATE. + // expected: UPDATE only. + EXPECT_EQ(fdset[0].events, SRT_EPOLL_UPDATE); + EXPECT_NE(extra_call.get(), SRT_INVALID_SOCK); + } + + std::cout << "Joining connector thread(s)\n"; + for (size_t i = 0; i < nmembers; ++i) + { + EXPECT_NE(connect_res[i].get(), SRT_INVALID_SOCK); + } srt_epoll_release(eid); - srt_close(sock); + srt_epoll_release(eid_postcheck); + srt_close(server_sock); srt_close(caller_sock); + srt_close(sock); +} - EXPECT_EQ(srt_cleanup(), 0); +TEST(CEPoll, EarlyListenerReady) +{ + testListenerReady(false, 0); } +TEST(CEPoll, LateListenerReady) +{ + testListenerReady(true, 0); +} + +#if ENABLE_BONDING + +TEST(CEPoll, EarlyGroupListenerReady_1) +{ + testListenerReady(false, 1); +} + +TEST(CEPoll, LateGroupListenerReady_1) +{ + testListenerReady(true, 1); +} + +TEST(CEPoll, EarlyGroupListenerReady_3) +{ + testListenerReady(false, 3); +} + +TEST(CEPoll, LateGroupListenerReady_3) +{ + testListenerReady(true, 3); +} + +#endif class TestEPoll: public srt::Test { From 039291cda0aeca208f77e720f41a08c5aac265a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 11 Oct 2023 16:29:23 +0200 Subject: [PATCH 05/15] Fixed the problem with locking --- srtcore/api.cpp | 21 ++++++++++++--------- srtcore/api.h | 3 +++ 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index a1e1abb14..cf524ea14 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -890,8 +890,6 @@ int srt::CUDTUnited::checkQueuedSocketsEvents(const set& sockets) // that they should re-read the group list and re-check readiness. // Now we can do lock once and for all - //ScopedLock glk (m_GlobControlLock); // XXX DEADLOCKS - for (set::iterator i = sockets.begin(); i != sockets.end(); ++i) { CUDTSocket* s = locateSocket_LOCKED(*i); @@ -2438,14 +2436,19 @@ int srt::CUDTUnited::epoll_add_usock(const int eid, const SRTSOCKET u, const int } #endif - CUDTSocket* s = locateSocket(u); - if (s) - { - ret = epoll_add_usock_INTERNAL(eid, s, events); - } - else + // The call to epoll_add_usock_INTERNAL is expected + // to be called under m_GlobControlLock, so use this lock here, too. { - throw CUDTException(MJ_NOTSUP, MN_SIDINVAL); + ScopedLock cs (m_GlobControlLock); + CUDTSocket* s = locateSocket_LOCKED(u); + if (s) + { + ret = epoll_add_usock_INTERNAL(eid, s, events); + } + else + { + throw CUDTException(MJ_NOTSUP, MN_SIDINVAL); + } } return ret; diff --git a/srtcore/api.h b/srtcore/api.h index c049efde9..d08dca5b5 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -275,7 +275,10 @@ class CUDTUnited int& w_error, CUDT*& w_acpu); +#if ENABLE_BONDING + SRT_ATTR_REQUIRES(m_GlobControlLock) int checkQueuedSocketsEvents(const std::set& sockets); +#endif int installAcceptHook(const SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq); int installConnectHook(const SRTSOCKET lsn, srt_connect_callback_fn* hook, void* opaq); From bd9c4de9300ca9e8644a54f74596a8ed0a52ae0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Fri, 13 Oct 2023 11:10:05 +0200 Subject: [PATCH 06/15] Fixed a problem: invalid per-listener report of member connections. Any pending connection can accept a group. Removed other pending sockets after accepting a group. Fixed tests to handle options also with RAII srt initializer --- docs/API/API-functions.md | 56 +++++++++-- srtcore/api.cpp | 110 +++++++++++++++++--- srtcore/api.h | 1 + srtcore/core.cpp | 3 +- srtcore/epoll.cpp | 25 ++++- srtcore/group.cpp | 20 ++++ srtcore/group.h | 16 ++- test/test_env.h | 7 +- test/test_epoll.cpp | 205 ++++++++++++++++++++++++++++++++++++++ 9 files changed, 410 insertions(+), 33 deletions(-) diff --git a/docs/API/API-functions.md b/docs/API/API-functions.md index 90ed257dd..182ecdebc 100644 --- a/docs/API/API-functions.md +++ b/docs/API/API-functions.md @@ -652,16 +652,52 @@ the call will block until the incoming connection is ready. Otherwise, the call always returns immediately. The `SRT_EPOLL_IN` epoll event should be checked on the `lsn` socket prior to calling this function in that case. -If the pending connection is a group connection (initiated on the peer side by -calling the connection function using a group ID, and permitted on the listener -socket by the [`SRTO_GROUPCONNECT`](API-socket-options.md#SRTO_GROUPCONNECT) -flag), then the value returned is a group ID. This function then creates a new -group, as well as a new socket for this connection, that will be added to the -group. Once the group is created this way, further connections within the same -group, as well as sockets for them, will be created in the background. The -[`SRT_EPOLL_UPDATE`](#SRT_EPOLL_UPDATE) event is raised on the `lsn` socket when -a new background connection is attached to the group, although it's usually for -internal use only. +Note that this event might sometimes be spurious in case when the link for +the pending connection gets broken before the accepting operation is finished. +The `SRT_EPOLL_IN` flag set for the listener socket is still not a guarantee +that the following call to `srt_accept` will succeed. + +If the listener socket is allowed to accept group connections, if it is set the +[`SRTO_GROUPCONNECT`](API-socket-options.md#SRTO_GROUPCONNECT) flag, then +a group ID will be returned, if it's a pending group connection. This can be +recognized by checking if `SRTGROUP_MASK` is set on the returned value. +Accepting a group connection differs to accepting a single socket connection +by that: + +1. The connection is reported only for the very first socket that has been +successfully connected. Only for this case will the `SRT_EPOLL_IN` flag be +set on the listener and only in this case will the `srt_accept` call report +the connection. + +2. Further member connections of the group that has been already once accepted +will be handled in the background, and the listener socket will no longer get +the `SRT_EPOLL_IN` flag set when it happens. Instead the +[`SRT_EPOLL_UPDATE`](#SRT_EPOLL_UPDATE) flag will be set. This flag is +edge-triggered-only because there is no operation to be performed in response +that could make this flag cleared. It's mostly used internally and the +application may use it to update its group data cache. + +3. If your application has created more than one listener socket that has +allowed group connections, every newly connected socket that is a member of an +already connected group will join this group no matter to which listener +socket it was reported. If you want to use this feature, you should take special +care of how you perform the accept operation. The group connection may be +accepted off any of these listener sockets, but still only once. It is then +recommended: + + * In non-blocking mode, poll on all listener sockets that are expected to + get a group connection at once + + * In blocking mode, use `srt_accept_bond` instead (it uses epoll internally) + +Note also that in this case there are more chances for `SRT_EPOLL_IN` flag +to be spurious. For example, if you have two listener sockets configured for +group connection and on each of them there's a pending connection, you will +have `SRT_EPOLL_IN` flag set on both listener sockets. However, once you +accept the group connection on any of them, the other pending connection will +get automatically handled in the background and the existing `SRT_EPOLL_IN` +flag will be spurious (calling `srt_accept` on that listener socket will fail). + | Returns | | |:----------------------------- |:----------------------------------------------------------------------- | diff --git a/srtcore/api.cpp b/srtcore/api.cpp index cf524ea14..ad8bb154d 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -681,6 +681,17 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, goto ERR_ROLLBACK; } + // Acceptance of the group will have to be done through accepting + // of one of the pending sockets. There can be, however, multiple + // such sockets at a time, some of them might get broken before + // being accepted, and therefore we need to make all sockets ready. + // But then, acceptance of a group may happen only once, so if any + // sockets of the same group were submitted to accept, they must + // be removed from the accept queue at this time. + should_submit_to_accept = g->groupPending(); + + /* XXX remove if no longer informational + // Check if this is the first socket in the group. // If so, give it up to accept, otherwise just do nothing // The client will be informed about the newly added connection at the @@ -696,12 +707,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, break; } } - - // This is set to true only if the group is to be reported - // from the accept call for the first time. Once it is extracted - // this way, this flag is cleared. - if (should_submit_to_accept) - g->m_bPending = true; + */ // Update the status in the group so that the next // operation can include the socket in the group operation. @@ -714,11 +720,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, gm->rcvstate = SRT_GST_IDLE; gm->laststatus = SRTS_CONNECTED; - if (!g->m_bConnected) - { - HLOGC(cnlog.Debug, log << "newConnection(GROUP): First socket connected, SETTING GROUP CONNECTED"); - g->m_bConnected = true; - } + g->setGroupConnected(); // XXX PROLBEM!!! These events are subscribed here so that this is done once, lazily, // but groupwise connections could be accepted from multiple listeners for the same group! @@ -1231,13 +1233,16 @@ SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int // it's a theoretically possible scenario if (s->m_GroupOf) { - u = s->m_GroupOf->m_GroupID; - s->core().m_config.iGroupConnect = 1; // should be derived from ls, but make sure - s->m_GroupOf->m_bPending = false; - + CUDTGroup* g = s->m_GroupOf; // Mark the beginning of the connection at the moment // when the group ID is returned to the app caller - s->m_GroupOf->m_stats.tsLastSampleTime = steady_clock::now(); + g->m_stats.tsLastSampleTime = steady_clock::now(); + + HLOGC(cnlog.Debug, log << "accept: reporting group $" << g->m_GroupID << " instead of member socket @" << u); + u = g->m_GroupID; + s->core().m_config.iGroupConnect = 1; // should be derived from ls, but make sure + g->m_bPending = false; + CUDT::uglobal().removePendingForGroup(g); } else { @@ -1263,6 +1268,79 @@ SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int return u; } +#if ENABLE_BONDING + +// [[using locked(m_GlobControlLock)]] +void srt::CUDTUnited::removePendingForGroup(const CUDTGroup* g) +{ + // We don't have a list of listener sockets that have ever + // reported a pending connection for a group, so the only + // way to find them is to ride over the list of all sockets... + + list members; + g->getMemberSockets((members)); + + for (sockets_t::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++i) + { + CUDTSocket* s = i->second; + // Check if any of them is a listener socket... + + /* XXX This is left for information only that we are only + interested with listener sockets - with the current + implementation checking it is pointless because the + m_QueuedSockets structure is present in every socket + anyway even if it's not a listener, and only listener + sockets may have this container nonempty. So checking + the container should suffice. + + if (!s->core().m_bListening) + continue; + */ + + if (s->m_QueuedSockets.empty()) + continue; + + // Somehow fortunate for us that it's a set, so we + // can simply check if this allegedly listener socket + // contains any of them. + for (list::iterator m = members.begin(), mx = m; m != members.end(); m = mx) + { + ++mx; + std::set::iterator q = s->m_QueuedSockets.find(*m); + if (q != s->m_QueuedSockets.end()) + { + HLOGC(cnlog.Debug, log << "accept: listener @" << s->m_SocketID + << " had ququed member @" << *m << " -- removed"); + // Found an intersection socket. + // Remove it from the listener queue + s->m_QueuedSockets.erase(q); + + // NOTE ALSO that after this removal the queue may be EMPTY, + // and if so, the listener socket should be no longer ready for accept. + if (s->m_QueuedSockets.empty()) + { + m_EPoll.update_events(s->m_SocketID, s->core().m_sPollID, SRT_EPOLL_ACCEPT, false); + } + + // and remove it also from the members list. + // This can be done safely because we use a SAFE LOOP. + // We can also do it safely because a socket may be + // present in only one listener socket in the whole app. + members.erase(m); + } + } + + // It may happen that the list of members can be + // eventually purged even if we haven't checked every socket. + // If it happens so, quit immediately because there's nothing + // left to do. + if (members.empty()) + return; + } +} + +#endif + int srt::CUDTUnited::connect(SRTSOCKET u, const sockaddr* srcname, const sockaddr* tarname, int namelen) { // Here both srcname and tarname must be specified diff --git a/srtcore/api.h b/srtcore/api.h index d08dca5b5..412375688 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -278,6 +278,7 @@ class CUDTUnited #if ENABLE_BONDING SRT_ATTR_REQUIRES(m_GlobControlLock) int checkQueuedSocketsEvents(const std::set& sockets); + void removePendingForGroup(const CUDTGroup* g); #endif int installAcceptHook(const SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index fe30efa2b..c6b50292c 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -3300,7 +3300,8 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3 // This can only happen on a listener (it's only called on a site that is // HSD_RESPONDER), so it was a response for a groupwise connection. // Therefore such a group shall always be considered opened. - gp->setOpen(); + // It's also set pending and it stays this way until accepted. + gp->setOpenPending(); HLOGC(gmlog.Debug, log << CONID() << "makeMePeerOf: no group has peer=$" << peergroup << " - creating new mirror group $" diff --git a/srtcore/epoll.cpp b/srtcore/epoll.cpp index e2b861bf9..80fe53a65 100644 --- a/srtcore/epoll.cpp +++ b/srtcore/epoll.cpp @@ -506,18 +506,23 @@ int srt::CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int6 ScopedLock pg(m_EPollLock); map::iterator p = m_mPolls.find(eid); if (p == m_mPolls.end()) + { + LOGC(ealog.Error, log << "epoll_uwait: E" << eid << " doesn't exist"); throw CUDTException(MJ_NOTSUP, MN_EIDINVAL); + } CEPollDesc& ed = p->second; if (!ed.flags(SRT_EPOLL_ENABLE_EMPTY) && ed.watch_empty()) { // Empty EID is not allowed, report error. + LOGC(ealog.Error, log << "epoll_uwait: E" << eid << " is empty (use SRT_EPOLL_ENABLE_EMPTY to allow)"); throw CUDTException(MJ_NOTSUP, MN_EEMPTY); } if (ed.flags(SRT_EPOLL_ENABLE_OUTPUTCHECK) && (fdsSet == NULL || fdsSize == 0)) { - // Empty EID is not allowed, report error. + // Empty container is not allowed, report error. + LOGC(ealog.Error, log << "epoll_uwait: empty output container with E" << eid << " (use SRT_EPOLL_ENABLE_OUTPUTCHECK to allow)"); throw CUDTException(MJ_NOTSUP, MN_INVAL); } @@ -525,6 +530,7 @@ int srt::CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int6 { // XXX Add error log // uwait should not be used with EIDs subscribed to system sockets + LOGC(ealog.Error, log << "epoll_uwait: E" << eid << " is subscribed to system sckets (not allowed for uwait)"); throw CUDTException(MJ_NOTSUP, MN_INVAL); } @@ -536,11 +542,20 @@ int srt::CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int6 ++total; if (total > fdsSize) + { + HLOGC(ealog.Debug, log << "epoll_uwait: output container size=" << fdsSize << " insufficient to report all sockets"); break; + } fdsSet[pos] = *i; + IF_HEAVY_LOGGING(std::ostringstream out); + IF_HEAVY_LOGGING(out << "epoll_uwait: Notice: fd=" << i->fd << " events="); + IF_HEAVY_LOGGING(PrintEpollEvent(out, i->events, 0)); + + SRT_ATR_UNUSED const bool was_edge = ed.checkEdge(i++); // NOTE: potentially deletes `i` + IF_HEAVY_LOGGING(out << (was_edge ? "(^)" : "")); + HLOGP(ealog.Debug, out.str()); - ed.checkEdge(i++); // NOTE: potentially deletes `i` } if (total) return total; @@ -875,6 +890,12 @@ int srt::CEPoll::update_events(const SRTSOCKET& uid, std::set& eids, const return -1; // still, ignored. } + if (uid == SRT_INVALID_SOCK || uid == 0) + { + LOGC(eilog.Fatal, log << "epoll/update: IPE: invalid 'uid' submitted for update!"); + return -1; + } + int nupdated = 0; vector lost; diff --git a/srtcore/group.cpp b/srtcore/group.cpp index d70a32ba4..f3931d1d6 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -3944,6 +3944,7 @@ void CUDTGroup::setGroupConnected() { if (!m_bConnected) { + HLOGC(cnlog.Debug, log << "GROUP: First socket connected, SETTING GROUP CONNECTED"); // Switch to connected state and give appropriate signal m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_CONNECT, true); m_bConnected = true; @@ -4016,6 +4017,25 @@ void CUDTGroup::updateLatestRcv(CUDTSocket* s) } } +struct FExtractGroupID +{ + SRTSOCKET operator()(const groups::SocketData& d) + { + return d.id; + } +}; + +void CUDTGroup::getMemberSockets(std::list& w_ids) const +{ + ScopedLock gl (m_GroupLock); + + for (cgli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi) + { + w_ids.push_back(gi->id); + } +} + + void CUDTGroup::activateUpdateEvent(bool still_have_items) { // This function actually reacts on the fact that a socket diff --git a/srtcore/group.h b/srtcore/group.h index 90dcace16..3ee51bb47 100644 --- a/srtcore/group.h +++ b/srtcore/group.h @@ -101,6 +101,7 @@ class CUDTGroup typedef std::list group_t; typedef group_t::iterator gli_t; + typedef group_t::const_iterator cgli_t; typedef std::vector< std::pair > sendable_t; struct Sendstate @@ -402,7 +403,7 @@ class CUDTGroup void getGroupCount(size_t& w_size, bool& w_still_alive); srt::CUDTUnited& m_Global; - srt::sync::Mutex m_GroupLock; + mutable srt::sync::Mutex m_GroupLock; SRTSOCKET m_GroupID; SRTSOCKET m_PeerGroupID; @@ -431,6 +432,8 @@ class CUDTGroup gli_t begin() { return m_List.begin(); } gli_t end() { return m_List.end(); } + cgli_t begin() const { return m_List.begin(); } + cgli_t end() const { return m_List.end(); } bool empty() { return m_List.empty(); } void push_back(const SocketData& data) { m_List.push_back(data); ++m_SizeCache; } void clear() @@ -752,7 +755,14 @@ class CUDTGroup // Required after the call on newGroup on the listener side. // On the listener side the group is lazily created just before // accepting a new socket and therefore always open. - void setOpen() { m_bOpened = true; } + // However, after creation it will be still waiting for being + // extracted by the application in `srt_accept`, and until then + // it stays as pending. + void setOpenPending() + { + m_bOpened = true; + m_bPending = true; + } std::string CONID() const { @@ -812,6 +822,8 @@ class CUDTGroup void updateLatestRcv(srt::CUDTSocket*); + void getMemberSockets(std::list&) const; + // Property accessors SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRTSOCKET, id, m_GroupID); SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRTSOCKET, peerid, m_PeerGroupID); diff --git a/test/test_env.h b/test/test_env.h index 2e2f78f7d..2b26995dd 100644 --- a/test/test_env.h +++ b/test/test_env.h @@ -53,7 +53,11 @@ class TestInit static void start(int& w_retstatus); static void stop(); - TestInit() { start((ninst)); } + TestInit() + { + start((ninst)); + HandlePerTestOptions(); + } ~TestInit() { stop(); } void HandlePerTestOptions(); @@ -71,7 +75,6 @@ class Test: public testing::Test void SetUp() override final { init_holder.reset(new TestInit); - init_holder->HandlePerTestOptions(); setup(); } diff --git a/test/test_epoll.cpp b/test/test_epoll.cpp index 46cb10dce..ec3cf7fd5 100644 --- a/test/test_epoll.cpp +++ b/test/test_epoll.cpp @@ -12,6 +12,36 @@ using namespace std; using namespace srt; +static ostream& PrintEpollEvent(ostream& os, int events, int et_events = 0) +{ + static pair const namemap [] = { + make_pair(SRT_EPOLL_IN, "R"), + make_pair(SRT_EPOLL_OUT, "W"), + make_pair(SRT_EPOLL_ERR, "E"), + make_pair(SRT_EPOLL_UPDATE, "U") + }; + bool any = false; + + const int N = (int)Size(namemap); + + for (int i = 0; i < N; ++i) + { + if (events & namemap[i].first) + { + os << "["; + if (et_events & namemap[i].first) + os << "^"; + os << namemap[i].second << "]"; + any = true; + } + } + + if (!any) + os << "[]"; + + return os; +} + TEST(CEPoll, InfiniteWait) { @@ -697,8 +727,183 @@ TEST(CEPoll, LateGroupListenerReady_3) testListenerReady(true, 3); } + +void testMultipleListenerReady(const bool LATE_CALL) +{ + sockaddr_in sa; + memset(&sa, 0, sizeof sa); + sa.sin_family = AF_INET; + sa.sin_port = htons(5555); + ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1); + + sockaddr_in sa2; + memset(&sa2, 0, sizeof sa2); + sa2.sin_family = AF_INET; + sa2.sin_port = htons(5556); + ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa2.sin_addr), 1); + + TestInit init; + + SRTSOCKET server_sock, server_sock2, caller_sock; + server_sock = srt_create_socket(); + server_sock2 = srt_create_socket(); + + caller_sock = srt_create_group(SRT_GTYPE_BROADCAST); + int on = 1; + EXPECT_NE(srt_setsockflag(server_sock, SRTO_GROUPCONNECT, &on, sizeof on), SRT_ERROR); + EXPECT_NE(srt_setsockflag(server_sock2, SRTO_GROUPCONNECT, &on, sizeof on), SRT_ERROR); + + srt_bind(server_sock, (sockaddr*)& sa, sizeof(sa)); + srt_listen(server_sock, 3); + srt::setopt(server_sock)[SRTO_RCVSYN] = false; + + srt_bind(server_sock2, (sockaddr*)& sa2, sizeof(sa2)); + srt_listen(server_sock2, 3); + srt::setopt(server_sock2)[SRTO_RCVSYN] = false; + + // Ok, the listener socket is ready; now make a call, but + // do not do anything on the listener socket yet. + + std::cout << "Using " << (LATE_CALL ? "LATE" : "EARLY") << " call\n"; + + std::vector> connect_res; + + if (LATE_CALL) + { + connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa]() { + this_thread::sleep_for(chrono::milliseconds(1)); + return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); + })); + + connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa2]() { + this_thread::sleep_for(chrono::milliseconds(1)); + return srt_connect(caller_sock, (sockaddr*)& sa2, sizeof(sa)); + })); + + + std::cout << "STARTED connecting...\n"; + } + + std::cout << "Sleeping 1s...\n"; + this_thread::sleep_for(chrono::milliseconds(1000)); + + // What is important is that the accepted socket is now reporting in + // on the listener socket. So let's create an epoll. + + int eid = srt_epoll_create(); + int eid_postcheck = srt_epoll_create(); + + // and add this listener to it + int modes = SRT_EPOLL_IN; + int modes_postcheck = SRT_EPOLL_IN | SRT_EPOLL_UPDATE; + EXPECT_NE(srt_epoll_add_usock(eid, server_sock, &modes), SRT_ERROR); + EXPECT_NE(srt_epoll_add_usock(eid, server_sock2, &modes), SRT_ERROR); + EXPECT_NE(srt_epoll_add_usock(eid_postcheck, server_sock, &modes_postcheck), SRT_ERROR); + EXPECT_NE(srt_epoll_add_usock(eid_postcheck, server_sock2, &modes_postcheck), SRT_ERROR); + + if (!LATE_CALL) + { + connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa]() { + this_thread::sleep_for(chrono::milliseconds(1)); + return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); + })); + + connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa2]() { + this_thread::sleep_for(chrono::milliseconds(1)); + return srt_connect(caller_sock, (sockaddr*)& sa2, sizeof(sa)); + })); + + std::cout << "STARTED connecting...\n"; + } + + // Sleep to make sure that the connection process has started. + this_thread::sleep_for(chrono::milliseconds(100)); + + std::cout << "Waiting for readiness on @" << server_sock << " and @" << server_sock2 << "\n"; + // And see now if the waiting accepted socket reports it. + + // This time we should expect that the connection reports in + // on two listener sockets + SRT_EPOLL_EVENT fdset[2] = {}; + std::ostringstream out; + + int nready = srt_epoll_uwait(eid, fdset, 2, 5000); + EXPECT_EQ(nready, 2); + out << "Ready socks:"; + for (int i = 0; i < nready; ++i) + { + out << " @" << fdset[i].fd; + PrintEpollEvent(out, fdset[i].events); + } + out << std::endl; + std::cout << out.str(); + + std::cout << "Accepting...\n"; + sockaddr_in scl; + int sclen = sizeof scl; + + // We choose the SECOND one to extract the group connection. + SRTSOCKET sock = srt_accept(server_sock2, (sockaddr*)& scl, &sclen); + EXPECT_NE(sock, SRT_INVALID_SOCK); + + // Make sure this time that the accepted connection is a group. + EXPECT_EQ(sock & SRTGROUP_MASK, SRTGROUP_MASK); + + std::cout << "Check if there's still UPDATE pending\n"; + // Spawn yet another connection within the group, just to get the update + auto extra_call = std::async(std::launch::async, [&caller_sock, &sa]() { + return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); + }); + // For 2+ members, additionally check if there AREN'T any + // further acceptance members, but there are UPDATEs. + // Note that if this was done AFTER accepting, the UPDATE would + // be only set one one socket. + nready = srt_epoll_uwait(eid_postcheck, fdset, 1, 5000); + EXPECT_EQ(nready, 1); + + std::cout << "Ready socks:"; + for (int i = 0; i < nready; ++i) + { + std::cout << " @" << fdset[i].fd; + PrintEpollEvent(std::cout, fdset[i].events); + } + std::cout << std::endl; + + // SUBSCRIBED EVENTS: IN, UPDATE. + // expected: UPDATE only. + EXPECT_EQ(fdset[0].events, SRT_EPOLL_UPDATE); + EXPECT_NE(extra_call.get(), SRT_INVALID_SOCK); + + std::cout << "Joining connector thread(s)\n"; + for (size_t i = 0; i < connect_res.size(); ++i) + { + EXPECT_NE(connect_res[i].get(), SRT_INVALID_SOCK); + } + + srt_epoll_release(eid); + srt_epoll_release(eid_postcheck); + + srt_close(server_sock); + srt_close(server_sock2); + srt_close(caller_sock); + srt_close(sock); +} + +TEST(CEPoll, EarlyGroupMultiListenerReady) +{ + testMultipleListenerReady(false); +} + +TEST(CEPoll, LateGroupMultiListenerReady) +{ + testMultipleListenerReady(true); +} + + + #endif + class TestEPoll: public srt::Test { protected: From 8d3c5ca795af950ff5bd492aff6bc647232ff9bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Fri, 13 Oct 2023 16:08:36 +0200 Subject: [PATCH 07/15] Updated documentation --- docs/API/API-functions.md | 166 ++++++++++++++++++++++---------------- 1 file changed, 98 insertions(+), 68 deletions(-) diff --git a/docs/API/API-functions.md b/docs/API/API-functions.md index 182ecdebc..6269c4373 100644 --- a/docs/API/API-functions.md +++ b/docs/API/API-functions.md @@ -627,76 +627,106 @@ the listener socket to accept group connections SRTSOCKET srt_accept(SRTSOCKET lsn, struct sockaddr* addr, int* addrlen); ``` -Accepts a pending connection, then creates and returns a new socket or -group ID that handles this connection. The group and socket can be -distinguished by checking the `SRTGROUP_MASK` bit on the returned ID. - -* `lsn`: the listener socket previously configured by [`srt_listen`](#srt_listen) -* `addr`: the IP address and port specification for the remote party +Extracts the first connection request on the queue of pending connections for +the listening socket, `lsn`, then creates and returns a new socket or group ID +that handles this connection. The group and socket can be distinguished by +checking the `SRTGROUP_MASK` bit on the returned ID. Note that by default group +connections will be rejected - this feature can be only enabled on demand (see +below). + +* `lsn`: the listening socket +* `addr`: a location to store the remote IP address and port for the connection * `addrlen`: INPUT: size of `addr` pointed object. OUTPUT: real size of the returned object -**NOTE:** `addr` is allowed to be NULL, in which case it's understood that the -application is not interested in the address from which the connection originated. -Otherwise `addr` should specify an object into which the address will be written, -and `addrlen` must also specify a variable to contain the object size. Note also -that in the case of group connection only the initial connection that -establishes the group connection is returned, together with its address. As -member connections are added or broken within the group, you can obtain this -information through [`srt_group_data`](#srt_group_data) or the data filled by -[`srt_sendmsg2`](#srt_sendmsg) and [`srt_recvmsg2`](#srt_recvmsg2). - -If the `lsn` listener socket is configured for blocking mode -([`SRTO_RCVSYN`](API-socket-options.md#SRTO_RCVSYN) set to true, default), -the call will block until the incoming connection is ready. Otherwise, the -call always returns immediately. The `SRT_EPOLL_IN` epoll event should be -checked on the `lsn` socket prior to calling this function in that case. - -Note that this event might sometimes be spurious in case when the link for -the pending connection gets broken before the accepting operation is finished. -The `SRT_EPOLL_IN` flag set for the listener socket is still not a guarantee -that the following call to `srt_accept` will succeed. - -If the listener socket is allowed to accept group connections, if it is set the -[`SRTO_GROUPCONNECT`](API-socket-options.md#SRTO_GROUPCONNECT) flag, then -a group ID will be returned, if it's a pending group connection. This can be -recognized by checking if `SRTGROUP_MASK` is set on the returned value. -Accepting a group connection differs to accepting a single socket connection -by that: - -1. The connection is reported only for the very first socket that has been -successfully connected. Only for this case will the `SRT_EPOLL_IN` flag be -set on the listener and only in this case will the `srt_accept` call report -the connection. - -2. Further member connections of the group that has been already once accepted -will be handled in the background, and the listener socket will no longer get -the `SRT_EPOLL_IN` flag set when it happens. Instead the -[`SRT_EPOLL_UPDATE`](#SRT_EPOLL_UPDATE) flag will be set. This flag is -edge-triggered-only because there is no operation to be performed in response -that could make this flag cleared. It's mostly used internally and the -application may use it to update its group data cache. - -3. If your application has created more than one listener socket that has -allowed group connections, every newly connected socket that is a member of an -already connected group will join this group no matter to which listener -socket it was reported. If you want to use this feature, you should take special -care of how you perform the accept operation. The group connection may be -accepted off any of these listener sockets, but still only once. It is then -recommended: - - * In non-blocking mode, poll on all listener sockets that are expected to - get a group connection at once - - * In blocking mode, use `srt_accept_bond` instead (it uses epoll internally) - -Note also that in this case there are more chances for `SRT_EPOLL_IN` flag -to be spurious. For example, if you have two listener sockets configured for -group connection and on each of them there's a pending connection, you will -have `SRT_EPOLL_IN` flag set on both listener sockets. However, once you -accept the group connection on any of them, the other pending connection will -get automatically handled in the background and the existing `SRT_EPOLL_IN` -flag will be spurious (calling `srt_accept` on that listener socket will fail). +General requirements for parameter correctness: + +* `lsn` must be first [bound](#srt_bind) and [listening](#srt_listen) + +* `addr` may be NULL, or otherwise it must be a pointer to an object +that can be treated as an instance of `sockaddr_in` or `sockaddr_in6` + +* `addrlen` should be a pointer to a variable set to the size of the object +specified in `addr`, if `addr` is not NULL. Otherwise it's ignored. + +If `addr` is not NULL, the information about the source IP address and +port of the peer will be written into this object. Note that whichever +type of object is expected here (`sockaddr_in` or `sockaddr_in6`), it +depends on the address type used in the `srt_bind` call for `lsn`. +If unsure in a particular situation, it is recommended that you use +`sockaddr_storage` or `srt::sockaddr_any`. + +If the `lsn` listener socket is blocking mode (if +[`SRTO_RCVSYN`](API-socket-options.md#SRTO_RCVSYN) is set to true, +which is default), the call will block until the incoming connection is ready +for extraction. Otherwise, the call always returns immediately, possibly with +failure, if there was no pending connection waiting on the listening socket +`lsn`. + +The listener socket can be checked for any pending connections prior to calling +`srt_accept` by checking the `SRT_EPOLL_ACCEPT` epoll event (which is an alias +to `SRT_EPOLL_IN`). This event might be spurious in certain cases though, for +example, when the connection has been closed by the peer or broken before the +application extracts it. The call to `srt_accept` would then still fail in +such a case. + +In order to allow the listening socket `lsn` to accept a group connection, +the [`SRTO_GROUPCONNECT`](API-socket-options.md#SRTO_GROUPCONNECT) socket option +for the listening socket must be set to 1. Note that single socket connections +can still be reported to that socket. The application can distinguish the socket +and group connection by checking the `SRTGROUP_MASK` bit on the returned +successful value. There are some important differences to single socket +connections: + +1. Accepting a group connection can be done only once per connection. The +actual connection reporter is a socket, like before, but once you call +`srt_accept` and receive this group ID, it is the group considered connected, +and any other member connections of the same group will be handled in the +background. + +2. If a group was extracted from the `srt_accept` call, the address reported in +`addr` parameter is still the address of the connection that has triggered the +group connection extraction. While the group is connected, potentially new +connections may be added and any existing ones get broken at any time. The +information about all member connections, that are active at the moment, can be +obtained at any time through [`srt_group_data`](#srt_group_data) or the data +filled by [`srt_sendmsg2`](#srt_sendmsg2) and [`srt_recvmsg2`](#srt_recvmsg2) +in the [`SRT_MSGCTRL`](#SRT_MSGCTRL) structure. + +3. Listening sockets are not bound to groups anyhow. You can allow multiple +listening sockets to accept group connections and the connection extracted +from the listener, if it is declared to be a group member, will join its +group, no matter which of the listening sockets has received the connection +request. This feature is prone to more tricky rules, however: + + * If you use multiple listener sockets, all of them in blocking mode, + allowed for group connections, and receiving connection requests for + the same group at the moment, and you run one thread per `srt_accept` + call, it is undefined, which of them will extract the group ID + for the connection, but still only one will, while the others will + continue blocking. If you want to use only one thread for accepting + connections from potentially multiple listening sockets in the blocking + mode, you should use [`srt_accept_bond`](#srt_accept_bond) instead. + + * If at the moment multiple listener sockets have received connection + request and you query them all for readiness epoll flags (by calling + an epoll waiting function), all of them will get the `SRT_EPOLL_ACCEPT` + flag set, but still only one of them will return the group ID from the + `srt_accept` call. After this call, from all listener sockets in the + whole application the `SRT_EPOLL_ACCEPT` flag, that was set by the reason + of a pending connection for the same group, will be withdrawn (that is, + it will be cleared if there are no other pending connections). This is + then yet another situation when this flag can be spurious. + +4. If you query a listening socket for epoll flags after the `srt_accept` +function has once returned the group ID, the listening sockets that have +received new member connection requests within that group will report only the +[`SRT_EPOLL_UPDATE`](#SRT_EPOLL_UPDATE) flag. This flag is edge-triggered-only +because there is no operation you can perform in response in order to clear +this flag. This flag is mostly used internally and the application may use it +if it would like to trigger updating the current group information due to +having one newly added member connection. + | Returns | | @@ -707,7 +737,7 @@ flag will be spurious (calling `srt_accept` on that listener socket will fail). | Errors | | |:--------------------------------- |:----------------------------------------------------------------------- | -| [`SRT_EINVPARAM`](#srt_einvparam) | NULL specified as `addrlen`, when `addr` is not NULL | +| [`SRT_EINVPARAM`](#srt_einvparam) | Invalid `addr` or `addrlen` (see requirements in the begininng) | | [`SRT_EINVSOCK`](#srt_einvsock) | `lsn` designates no valid socket ID. | | [`SRT_ENOLISTEN`](#srt_enolisten) | `lsn` is not set up as a listener ([`srt_listen`](#srt_listen) not called). | | [`SRT_EASYNCRCV`](#srt_easyncrcv) | No connection reported so far. This error is reported only in the non-blocking mode | From ab96b4d6a7cb2debb9710365ef2ba5b44025b937 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Fri, 13 Oct 2023 17:37:25 +0200 Subject: [PATCH 08/15] Weird warning/error on Mac --- test/test_epoll.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/test_epoll.cpp b/test/test_epoll.cpp index ec3cf7fd5..0d4e29a67 100644 --- a/test/test_epoll.cpp +++ b/test/test_epoll.cpp @@ -12,7 +12,8 @@ using namespace std; using namespace srt; -static ostream& PrintEpollEvent(ostream& os, int events, int et_events = 0) +namespace { +ostream& PrintEpollEvent(ostream& os, int events, int et_events = 0) { static pair const namemap [] = { make_pair(SRT_EPOLL_IN, "R"), @@ -41,6 +42,7 @@ static ostream& PrintEpollEvent(ostream& os, int events, int et_events = 0) return os; } +} TEST(CEPoll, InfiniteWait) From 9b93a3a1ef36509632302b1d2cc8048009543ec4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Mon, 16 Oct 2023 11:26:06 +0200 Subject: [PATCH 09/15] Some cosmetic fixes. Fixes for CI build breaks --- srtcore/api.cpp | 1 - srtcore/common.cpp | 48 +++++++++++++++++++++++++++++++++++++++ srtcore/common.h | 19 ++-------------- srtcore/core.cpp | 2 -- srtcore/epoll.cpp | 31 ------------------------- srtcore/group.cpp | 9 -------- test/test_epoll.cpp | 31 ------------------------- testing/srt-test-live.cpp | 2 +- 8 files changed, 51 insertions(+), 92 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index ad8bb154d..bbc7be2c9 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -859,7 +859,6 @@ SRT_EPOLL_T srt::CUDTSocket::getListenerEvents() // SRT_EPOLL_ACCEPT flag. #if !ENABLE_BONDING -//#if 1 ScopedLock accept_lock (m_AcceptLock); // Make it simplified here - nonempty container = have acceptable sockets. diff --git a/srtcore/common.cpp b/srtcore/common.cpp index dc60b3db7..61ea56311 100644 --- a/srtcore/common.cpp +++ b/srtcore/common.cpp @@ -463,6 +463,54 @@ bool SrtParseConfig(const string& s, SrtConfig& w_config) return true; } + +std::string FormatLossArray(const std::vector< std::pair >& lra) +{ + std::ostringstream os; + + os << "[ "; + for (std::vector< std::pair >::const_iterator i = lra.begin(); i != lra.end(); ++i) + { + int len = CSeqNo::seqoff(i->first, i->second); + os << "%" << i->first; + if (len > 1) + os << "+" << len; + os << " "; + } + + os << "]"; + return os.str(); +} + +ostream& PrintEpollEvent(ostream& os, int events, int et_events) +{ + static pair const namemap [] = { + make_pair(SRT_EPOLL_IN, "R"), + make_pair(SRT_EPOLL_OUT, "W"), + make_pair(SRT_EPOLL_ERR, "E"), + make_pair(SRT_EPOLL_UPDATE, "U") + }; + bool any = false; + + const int N = (int)Size(namemap); + + for (int i = 0; i < N; ++i) + { + if (events & namemap[i].first) + { + os << "["; + if (et_events & namemap[i].first) + os << "^"; + os << namemap[i].second << "]"; + any = true; + } + } + + if (!any) + os << "[]"; + + return os; +} } // namespace srt namespace srt_logging diff --git a/srtcore/common.h b/srtcore/common.h index bfc0c1d96..276e47c7c 100644 --- a/srtcore/common.h +++ b/srtcore/common.h @@ -1430,23 +1430,8 @@ inline bool checkMappedIPv4(const sockaddr_in6& sa) return checkMappedIPv4(addr); } -inline std::string FormatLossArray(const std::vector< std::pair >& lra) -{ - std::ostringstream os; - - os << "[ "; - for (std::vector< std::pair >::const_iterator i = lra.begin(); i != lra.end(); ++i) - { - int len = CSeqNo::seqoff(i->first, i->second); - os << "%" << i->first; - if (len > 1) - os << "+" << len; - os << " "; - } - - os << "]"; - return os.str(); -} +std::string FormatLossArray(const std::vector< std::pair >& lra); +std::ostream& PrintEpollEvent(std::ostream& os, int events, int et_events = 0); } // namespace srt diff --git a/srtcore/core.cpp b/srtcore/core.cpp index c6b50292c..3ec9554b4 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -11676,7 +11676,6 @@ void srt::CUDT::addEPoll(const int eid) m_sPollID.insert(eid); leaveCS(uglobal().m_EPoll.m_EPollLock); - //* if (m_bListening) { // A listener socket can only get readiness on SRT_EPOLL_ACCEPT @@ -11694,7 +11693,6 @@ void srt::CUDT::addEPoll(const int eid) // used for listening and nothing else. return; } - // */ if (!stillConnected()) return; diff --git a/srtcore/epoll.cpp b/srtcore/epoll.cpp index 80fe53a65..e6ef44897 100644 --- a/srtcore/epoll.cpp +++ b/srtcore/epoll.cpp @@ -71,12 +71,6 @@ modified by using namespace std; using namespace srt::sync; -#if ENABLE_HEAVY_LOGGING -namespace srt { -static ostream& PrintEpollEvent(ostream& os, int events, int et_events = 0); -} -#endif - namespace srt_logging { extern Logger eilog, ealog; @@ -977,31 +971,6 @@ int srt::CEPoll::update_events(const SRTSOCKET& uid, std::set& eids, const namespace srt { -static ostream& PrintEpollEvent(ostream& os, int events, int et_events) -{ - static pair const namemap [] = { - make_pair(SRT_EPOLL_IN, "R"), - make_pair(SRT_EPOLL_OUT, "W"), - make_pair(SRT_EPOLL_ERR, "E"), - make_pair(SRT_EPOLL_UPDATE, "U") - }; - - const int N = (int)Size(namemap); - - for (int i = 0; i < N; ++i) - { - if (events & namemap[i].first) - { - os << "["; - if (et_events & namemap[i].first) - os << "^"; - os << namemap[i].second << "]"; - } - } - - return os; -} - string DisplayEpollResults(const std::map& sockset) { typedef map fmap_t; diff --git a/srtcore/group.cpp b/srtcore/group.cpp index f3931d1d6..4704a66e5 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -4017,14 +4017,6 @@ void CUDTGroup::updateLatestRcv(CUDTSocket* s) } } -struct FExtractGroupID -{ - SRTSOCKET operator()(const groups::SocketData& d) - { - return d.id; - } -}; - void CUDTGroup::getMemberSockets(std::list& w_ids) const { ScopedLock gl (m_GroupLock); @@ -4035,7 +4027,6 @@ void CUDTGroup::getMemberSockets(std::list& w_ids) const } } - void CUDTGroup::activateUpdateEvent(bool still_have_items) { // This function actually reacts on the fact that a socket diff --git a/test/test_epoll.cpp b/test/test_epoll.cpp index 0d4e29a67..38624a62e 100644 --- a/test/test_epoll.cpp +++ b/test/test_epoll.cpp @@ -12,37 +12,6 @@ using namespace std; using namespace srt; -namespace { -ostream& PrintEpollEvent(ostream& os, int events, int et_events = 0) -{ - static pair const namemap [] = { - make_pair(SRT_EPOLL_IN, "R"), - make_pair(SRT_EPOLL_OUT, "W"), - make_pair(SRT_EPOLL_ERR, "E"), - make_pair(SRT_EPOLL_UPDATE, "U") - }; - bool any = false; - - const int N = (int)Size(namemap); - - for (int i = 0; i < N; ++i) - { - if (events & namemap[i].first) - { - os << "["; - if (et_events & namemap[i].first) - os << "^"; - os << namemap[i].second << "]"; - any = true; - } - } - - if (!any) - os << "[]"; - - return os; -} -} TEST(CEPoll, InfiniteWait) diff --git a/testing/srt-test-live.cpp b/testing/srt-test-live.cpp index c4c752667..aa6b45d6a 100644 --- a/testing/srt-test-live.cpp +++ b/testing/srt-test-live.cpp @@ -300,7 +300,7 @@ extern "C" int SrtCheckGroupHook(void* , SRTSOCKET acpsock, int , const sockaddr size = sizeof gt; if (-1 != srt_getsockflag(acpsock, SRTO_GROUPTYPE, >, &size)) { - if (gt < Size(gtypes)) + if (size_t(gt) < Size(gtypes)) Verb() << " type=" << gtypes[gt] << VerbNoEOL; else Verb() << " type=" << int(gt) << VerbNoEOL; From f91f75a9619a1cd7496818527fd2fa8231c18ebb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Mon, 30 Oct 2023 16:00:16 +0100 Subject: [PATCH 10/15] Added group contents check and configurable sleep for group/listener tests --- test/test_epoll.cpp | 91 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 83 insertions(+), 8 deletions(-) diff --git a/test/test_epoll.cpp b/test/test_epoll.cpp index 38624a62e..2c98ec38d 100644 --- a/test/test_epoll.cpp +++ b/test/test_epoll.cpp @@ -548,6 +548,9 @@ TEST(CEPoll, ThreadedUpdate) void testListenerReady(const bool LATE_CALL, size_t nmembers) { + bool is_single = true; + bool want_sleep = !TestEnv::me->OptionPresent("nosleep"); + sockaddr_in sa; memset(&sa, 0, sizeof sa); sa.sin_family = AF_INET; @@ -564,6 +567,7 @@ void testListenerReady(const bool LATE_CALL, size_t nmembers) caller_sock = srt_create_group(SRT_GTYPE_BROADCAST); int on = 1; EXPECT_NE(srt_setsockflag(server_sock, SRTO_GROUPCONNECT, &on, sizeof on), SRT_ERROR); + is_single = false; } else { @@ -588,7 +592,8 @@ void testListenerReady(const bool LATE_CALL, size_t nmembers) // We don't need the caller to be async, it can hang up here. for (size_t i = 0; i < nmembers; ++i) { - connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa]() { + connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa, i]() { + std::cout << "[T:" << i << "] CALLING\n"; return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); })); } @@ -596,8 +601,11 @@ void testListenerReady(const bool LATE_CALL, size_t nmembers) std::cout << "STARTED connecting...\n"; } - std::cout << "Sleeping 1s...\n"; - this_thread::sleep_for(chrono::milliseconds(1000)); + if (want_sleep) + { + std::cout << "Sleeping 1s...\n"; + this_thread::sleep_for(chrono::milliseconds(1000)); + } // What is important is that the accepted socket is now reporting in // on the listener socket. So let's create an epoll. @@ -616,7 +624,8 @@ void testListenerReady(const bool LATE_CALL, size_t nmembers) // We don't need the caller to be async, it can hang up here. for (size_t i = 0; i < nmembers; ++i) { - connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa]() { + connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa, i]() { + std::cout << "[T:" << i << "] CALLING\n"; return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); })); } @@ -640,8 +649,9 @@ void testListenerReady(const bool LATE_CALL, size_t nmembers) std::cout << "With >1 members, check if there's still UPDATE pending\n"; // Spawn yet another connection within the group, just to get the update auto extra_call = std::async(std::launch::async, [&caller_sock, &sa]() { - return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); - }); + std::cout << "[T:X] CALLING (expected failure)\n"; + return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); + }); // For 2+ members, additionally check if there AREN'T any // further acceptance members, but there are UPDATEs. EXPECT_EQ(srt_epoll_uwait(eid_postcheck, fdset, 1, 5000), 1); @@ -649,15 +659,80 @@ void testListenerReady(const bool LATE_CALL, size_t nmembers) // SUBSCRIBED EVENTS: IN, UPDATE. // expected: UPDATE only. EXPECT_EQ(fdset[0].events, SRT_EPOLL_UPDATE); - EXPECT_NE(extra_call.get(), SRT_INVALID_SOCK); + SRTSOCKET joined = extra_call.get(); + EXPECT_NE(joined, SRT_INVALID_SOCK); + std::cout << Sprint("Extra joined: @", joined, "\n"); + } + + std::vector gdata; + + if (!is_single) + { + EXPECT_EQ(sock & SRTGROUP_MASK, SRTGROUP_MASK); + // +1 because we have added one more caller to check UPDATE event. + size_t inoutlen = nmembers+1; + gdata.resize(inoutlen); + int groupndata = srt_group_data(sock, gdata.data(), (&inoutlen)); + EXPECT_NE(groupndata, SRT_ERROR); + + std::ostringstream sout; + if (groupndata == SRT_ERROR) + sout << "ERROR: " << srt_getlasterror_str() << " OUTLEN: " << inoutlen << std::endl; + else + { + // Just to display the members + sout << "(Listener) Members: "; + + for (int i = 0; i < groupndata; ++i) + sout << "@" << gdata[i].id << " "; + sout << std::endl; + } + + std::cout << sout.str(); } std::cout << "Joining connector thread(s)\n"; for (size_t i = 0; i < nmembers; ++i) { - EXPECT_NE(connect_res[i].get(), SRT_INVALID_SOCK); + std::cout << "Join: #" << i << ":\n"; + SRTSOCKET called_socket = connect_res[i].get(); + std::cout << "... " << called_socket << std::endl; + EXPECT_NE(called_socket, SRT_INVALID_SOCK); } + if (!is_single) + { + EXPECT_EQ(caller_sock & SRTGROUP_MASK, SRTGROUP_MASK); + // +1 because we have added one more caller to check UPDATE event. + size_t inoutlen = nmembers+1; + gdata.resize(inoutlen); + int groupndata = srt_group_data(caller_sock, gdata.data(), (&inoutlen)); + EXPECT_NE(groupndata, SRT_ERROR); + + std::ostringstream sout; + if (groupndata == SRT_ERROR) + sout << "ERROR: " << srt_getlasterror_str() << " OUTLEN: " << inoutlen << std::endl; + else + { + // Just to display the members + sout << "(Caller) Members: "; + + for (int i = 0; i < groupndata; ++i) + sout << "@" << gdata[i].id << " "; + sout << std::endl; + } + + std::cout << sout.str(); + + if (want_sleep) + { + std::cout << "Sleep for 3 seconds to avoid closing-in-between\n"; + std::this_thread::sleep_for(std::chrono::seconds(3)); + } + } + + std::cout << "Releasing EID resources and all sockets\n"; + srt_epoll_release(eid); srt_epoll_release(eid_postcheck); From 188cbd1b4b2a43c50792b4a37e1a23dd447d6c83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Mon, 30 Oct 2023 17:15:56 +0100 Subject: [PATCH 11/15] Fixed build break on more pedantic compilers --- test/test_epoll.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_epoll.cpp b/test/test_epoll.cpp index 2c98ec38d..c720a1075 100644 --- a/test/test_epoll.cpp +++ b/test/test_epoll.cpp @@ -823,7 +823,7 @@ void testMultipleListenerReady(const bool LATE_CALL) connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa2]() { this_thread::sleep_for(chrono::milliseconds(1)); - return srt_connect(caller_sock, (sockaddr*)& sa2, sizeof(sa)); + return srt_connect(caller_sock, (sockaddr*)& sa2, sizeof(sa2)); })); @@ -856,7 +856,7 @@ void testMultipleListenerReady(const bool LATE_CALL) connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa2]() { this_thread::sleep_for(chrono::milliseconds(1)); - return srt_connect(caller_sock, (sockaddr*)& sa2, sizeof(sa)); + return srt_connect(caller_sock, (sockaddr*)& sa2, sizeof(sa2)); })); std::cout << "STARTED connecting...\n"; From e8cd37d25d45881da518ae794e01be55e7612378 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Tue, 23 Jan 2024 20:05:12 +0100 Subject: [PATCH 12/15] Further fixes for OptionProxy. Some cosmetics in the docs --- apps/apputil.hpp | 31 +++++++++++++++++++++++++------ docs/API/API-functions.md | 6 ++++-- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/apps/apputil.hpp b/apps/apputil.hpp index c79cf984e..1a0b158e0 100644 --- a/apps/apputil.hpp +++ b/apps/apputil.hpp @@ -350,22 +350,41 @@ struct OptionSetterProxy struct OptionProxy { - const OptionSetterProxy& parent; + OptionSetterProxy& parent; SRT_SOCKOPT opt; - template - OptionProxy& operator=(Type&& val) +#define SPEC(type) \ + OptionProxy& operator=(const type& val)\ + {\ + parent.result = srt_setsockflag(parent.s, opt, &val, sizeof val);\ + return *this;\ + } + + SPEC(int32_t); + SPEC(int64_t); + SPEC(bool); +#undef SPEC + + template + OptionProxy& operator=(const char (&val)[N]) + { + parent.result = srt_setsockflag(parent.s, opt, val, N-1); + return *this; + } + + OptionProxy& operator=(const std::string& val) { - Type vc(val); - srt_setsockflag(parent.s, opt, &vc, sizeof vc); + parent.result = srt_setsockflag(parent.s, opt, val.c_str(), val.size()); return *this; } }; - OptionProxy operator[](SRT_SOCKOPT opt) const + OptionProxy operator[](SRT_SOCKOPT opt) { return OptionProxy {*this, opt}; } + + operator int() { return result; } }; inline OptionSetterProxy setopt(SRTSOCKET socket) diff --git a/docs/API/API-functions.md b/docs/API/API-functions.md index 7608d32f0..368216005 100644 --- a/docs/API/API-functions.md +++ b/docs/API/API-functions.md @@ -639,7 +639,7 @@ below). * `addrlen`: INPUT: size of `addr` pointed object. OUTPUT: real size of the returned object -General requirements for parameter correctness: +General requirements for a parameter correctness: * `lsn` must be first [bound](#srt_bind) and [listening](#srt_listen) @@ -656,7 +656,7 @@ depends on the address type used in the `srt_bind` call for `lsn`. If unsure in a particular situation, it is recommended that you use `sockaddr_storage` or `srt::sockaddr_any`. -If the `lsn` listener socket is blocking mode (if +If the `lsn` listener socket is in the blocking mode (if [`SRTO_RCVSYN`](API-socket-options.md#SRTO_RCVSYN) is set to true, which is default), the call will block until the incoming connection is ready for extraction. Otherwise, the call always returns immediately, possibly with @@ -707,6 +707,8 @@ request. This feature is prone to more tricky rules, however: continue blocking. If you want to use only one thread for accepting connections from potentially multiple listening sockets in the blocking mode, you should use [`srt_accept_bond`](#srt_accept_bond) instead. + Note though that this function is actually a wrapper that changes locally + to the nonblocking mode on all these listeners and uses epoll internally. * If at the moment multiple listener sockets have received connection request and you query them all for readiness epoll flags (by calling From 90c28886b93f01a21e7b514277053cc9e6ec4b30 Mon Sep 17 00:00:00 2001 From: Mikolaj Malecki Date: Tue, 10 Sep 2024 14:41:33 +0200 Subject: [PATCH 13/15] Updated. Added new test from #1891. Added doc info --- docs/API/API-socket-options.md | 5 ++ test/test_bonding.cpp | 131 ++++++++++++++++++++++++++++++++- 2 files changed, 133 insertions(+), 3 deletions(-) diff --git a/docs/API/API-socket-options.md b/docs/API/API-socket-options.md index a06f8556d..438b90637 100644 --- a/docs/API/API-socket-options.md +++ b/docs/API/API-socket-options.md @@ -505,6 +505,11 @@ allowed must take this into consideration. It's up to the caller of this function to make this distinction and to take appropriate action depending on the type of entity returned. +Note: this flag should be altered **before** calling `srt_listen`. If you do +this after this call, you might have some pending group connections in the +meantime that will be rejected because group connections are not **yet** +allowed on this listener socket. + When this flag is set to 1 on an accepted socket that is passed to the listener callback handler, it means that this socket is created for a group connection and it will become a member of a group. Note that in this case diff --git a/test/test_bonding.cpp b/test/test_bonding.cpp index 0e48c8a04..eded4c20b 100644 --- a/test/test_bonding.cpp +++ b/test/test_bonding.cpp @@ -6,8 +6,11 @@ #include "gtest/gtest.h" #include "test_env.h" +#include "apputil.hpp" // Note: declares CreateAddr, but not srt::CreateAddr #include "srt.h" +#include "logging_api.h" +#include "common.h" #include "netinet_any.h" TEST(Bonding, SRTConnectGroup) @@ -376,7 +379,7 @@ TEST(Bonding, Options) #endif int allow = 1; ASSERT_NE(srt_setsockflag(lsn, SRTO_GROUPCONNECT, &allow, sizeof allow), SRT_ERROR); - sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET); + sockaddr_any sa = srt::CreateAddr("127.0.0.1", 5555, AF_INET); ASSERT_NE(srt_bind(lsn, sa.get(), sa.size()), SRT_ERROR); ASSERT_NE(srt_listen(lsn, 1), SRT_ERROR); started = true; @@ -413,7 +416,7 @@ TEST(Bonding, Options) } // Now the thread is accepting, so we call the connect. - sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET); + sockaddr_any sa = srt::CreateAddr("127.0.0.1", 5555, AF_INET); SRTSOCKET member = srt_connect(grp, sa.get(), sa.size()); // We've released the mutex and signaled the CV, so accept should proceed now. @@ -507,7 +510,7 @@ TEST(Bonding, InitialFailure) int allow = 1; ASSERT_NE(srt_setsockflag(lsn, SRTO_GROUPCONNECT, &allow, sizeof allow), SRT_ERROR); - sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET); + sockaddr_any sa = srt::CreateAddr("127.0.0.1", 5555, AF_INET); ASSERT_NE(srt_bind(lsn, sa.get(), sa.size()), SRT_ERROR); ASSERT_NE(srt_listen(lsn, 5), SRT_ERROR); @@ -564,3 +567,125 @@ TEST(Bonding, InitialFailure) srt_close(lsn); } +void SetLongSilenceTolerant(const SRTSOCKET s) +{ + int longtime = 100000; + + srt_setsockflag(s, SRTO_CONNTIMEO, &longtime, sizeof longtime); + srt_setsockflag(s, SRTO_PEERIDLETIMEO, &longtime, sizeof longtime); +} + +TEST(Bonding, DeadLinkUpdate) +{ + using namespace std; + using namespace std::chrono; + + srt::TestInit srtinit; + + SRTSOCKET listener = srt_create_socket(); + const SRTSOCKET group = srt_create_group(SRT_GTYPE_BACKUP); + + SetLongSilenceTolerant(listener); + SetLongSilenceTolerant(group); + + srt::sockaddr_any sa(AF_INET); + + inet_pton(AF_INET, "127.0.0.1", sa.get_addr()); + + sa.hport(5555); + + srt_bind(listener, sa.get(), sa.size()); + srt::setopt(listener)[SRTO_GROUPCONNECT] = 1; + srt_listen(listener, 1); + char srcbuf [] = "1234ABCD"; + + thread td = thread([&]() { + cout << "[T] Connecting 1...\n"; + const SRTSOCKET member1 = srt_connect(group, sa.get(), sa.size()); + EXPECT_NE(member1, SRT_INVALID_SOCK); + // Now wait 3s + cout << "[T] Link 1 established. Wait 3s...\n"; + this_thread::sleep_for(seconds(3)); + + cout << "[T] Connecting 2...\n"; + // Make a second connection + const SRTSOCKET member2 = srt_connect(group, sa.get(), sa.size()); + EXPECT_NE(member2, SRT_INVALID_SOCK); + + if (member2 == SRT_INVALID_SOCK || member1 == SRT_INVALID_SOCK) + { + srt_close(member1); + srt_close(member2); + cout << "[T] Test already failed, exitting\n"; + return; + } + + cout << "[T] Link 2 established. Wait 3s...\n"; + // Again wait 3s + this_thread::sleep_for(seconds(3)); + + cout << "[T] Killing link 1...\n"; + // Now close the first connection + srt_close(member1); + + // Now send the data and see if they are received + cout << "[T] Sending: size=" << (sizeof srcbuf) << " Content: '" << srcbuf << "'...\n"; + int nsent = srt_send(group, srcbuf, sizeof srcbuf); + EXPECT_NE(nsent, -1) << "srt_send:" << srt_getlasterror_str(); + + cout << "[T] Wait 3s...\n"; + // Again wait 3s + this_thread::sleep_for(seconds(3)); + + cout << "[T] Killing the group and exitting.\n"; + // And close + srt_close(group); + + cout << "[T] exit\n"; + }); + + cout << "Accepting (10s timeout)...\n"; + + SRTSOCKET lsnra [] = { listener }; + + // Using srt_accept_bond to apply accept timeout + const SRTSOCKET acp = srt_accept_bond(lsnra, 1, 10*1000); + + EXPECT_NE(acp, -1) << "srt_accept:" << srt_getlasterror_str(); + EXPECT_EQ(acp & SRTGROUP_MASK, SRTGROUP_MASK); + + // Close and set up the listener again. + srt_close(listener); + if (acp != SRT_ERROR) + { + listener = srt_create_socket(); + srt_bind(listener, sa.get(), sa.size()); + srt::setopt(listener)[SRTO_GROUPCONNECT] = 1; + srt_listen(listener, 1); + + cout << "Group accepted. Receiving...\n"; + char buf[1316] = ""; + const int nrecv = srt_recv(acp, buf, 1316); + int syserr, err; + err = srt_getlasterror(&syserr); + EXPECT_NE(nrecv, -1) << "srt_recv:" << srt_getlasterror_str(); + + cout << "Received: val=" << nrecv << " Content: '" << buf << "'\n"; + if (nrecv == -1) + { + cout << "ERROR: " << srt_strerror(err, syserr) << endl; + cout << "STATUS: " << srt_logging::SockStatusStr(srt_getsockstate(acp)) << endl; + } + else + { + EXPECT_EQ(strcmp(srcbuf, buf), 0); + } + + cout << "Closing.\n"; + srt_close(acp); + srt_close(listener); + } + + td.join(); +} + From a5161409b6399f06e2b639caf9e03b1f8f77f13f Mon Sep 17 00:00:00 2001 From: Mikolaj Malecki Date: Tue, 10 Sep 2024 15:49:14 +0200 Subject: [PATCH 14/15] Fixed right types for EXPECT_EQ calls in new tests --- test/test_epoll.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_epoll.cpp b/test/test_epoll.cpp index 2c8d58925..335a0b3ef 100644 --- a/test/test_epoll.cpp +++ b/test/test_epoll.cpp @@ -656,7 +656,7 @@ void testListenerReady(const bool LATE_CALL, size_t nmembers) // SUBSCRIBED EVENTS: IN, UPDATE. // expected: UPDATE only. - EXPECT_EQ(fdset[0].events, SRT_EPOLL_UPDATE); + EXPECT_EQ(SRT_EPOLL_OPT(fdset[0].events), SRT_EPOLL_UPDATE); SRTSOCKET joined = extra_call.get(); EXPECT_NE(joined, SRT_INVALID_SOCK); std::cout << Sprint("Extra joined: @", joined, "\n"); @@ -915,7 +915,7 @@ void testMultipleListenerReady(const bool LATE_CALL) // SUBSCRIBED EVENTS: IN, UPDATE. // expected: UPDATE only. - EXPECT_EQ(fdset[0].events, SRT_EPOLL_UPDATE); + EXPECT_EQ(SRT_EPOLL_OPT(fdset[0].events), SRT_EPOLL_UPDATE); EXPECT_NE(extra_call.get(), SRT_INVALID_SOCK); std::cout << "Joining connector thread(s)\n"; From 461def598584d779ef9ee044c7ba7631d4da7109 Mon Sep 17 00:00:00 2001 From: Mikolaj Malecki Date: Wed, 11 Sep 2024 08:52:57 +0200 Subject: [PATCH 15/15] Added setting UPDATE event on group on new connection. Fixed test with accept timeout --- srtcore/api.cpp | 9 +++++++++ test/test_bonding.cpp | 5 +---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 0cef76383..0c6df0ac0 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -805,6 +805,15 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, // acknowledge INTERNAL users waiting for new connections on the listening socket // that are reported when a new socket is connected within an already connected group. m_EPoll.update_events(listen, ls->core().m_sPollID, SRT_EPOLL_UPDATE, true); +#if ENABLE_BONDING + // Note that the code in this current IF branch can only be executed in case + // of group members. Otherwise should_submit_to_accept will be always true. + if (ns->m_GroupOf) + { + HLOGC(gmlog.Debug, log << "GROUP UPDATE $" << ns->m_GroupOf->id() << " per connected socket @" << ns->m_SocketID); + m_EPoll.update_events(ns->m_GroupOf->id(), ns->m_GroupOf->m_sPollID, SRT_EPOLL_UPDATE, true); + } +#endif CGlobEvent::triggerEvent(); } diff --git a/test/test_bonding.cpp b/test/test_bonding.cpp index eded4c20b..1d38e3a41 100644 --- a/test/test_bonding.cpp +++ b/test/test_bonding.cpp @@ -640,15 +640,12 @@ TEST(Bonding, DeadLinkUpdate) cout << "[T] Killing the group and exitting.\n"; // And close srt_close(group); - cout << "[T] exit\n"; }); cout << "Accepting (10s timeout)...\n"; - - SRTSOCKET lsnra [] = { listener }; - // Using srt_accept_bond to apply accept timeout + SRTSOCKET lsnra [] = { listener }; const SRTSOCKET acp = srt_accept_bond(lsnra, 1, 10*1000); EXPECT_NE(acp, -1) << "srt_accept:" << srt_getlasterror_str();