diff --git a/CMakeLists.txt b/CMakeLists.txt index a429cf495..8951dc46c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -73,7 +73,12 @@ if (NOT MICROSOFT) # that ENABLE_DEBUG is set as it should. if (ENABLE_DEBUG EQUAL 2) set (CMAKE_BUILD_TYPE "RelWithDebInfo") - add_definitions(-DNDEBUG) + if (ENABLE_ASSERT) + # Add _DEBUG macro if explicitly requested, to enable SRT_ASSERT(). + add_definitions(-D_DEBUG) + else() + add_definitions(-DNDEBUG) + endif() elseif (ENABLE_DEBUG) # 1, ON, YES, TRUE, Y, or any other non-zero number set (CMAKE_BUILD_TYPE "Debug") diff --git a/srtcore/api.cpp b/srtcore/api.cpp index cb9e5e65d..f9ac6a8d1 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -2662,31 +2662,34 @@ void srt::CUDTUnited::checkBrokenSockets() for (sockets_t::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++j) { + CUDTSocket* ps = j->second; + CUDT& u = ps->core(); + // HLOGC(smlog.Debug, log << "checking CLOSED socket: " << j->first); - if (!is_zero(j->second->core().m_tsLingerExpiration)) + if (!is_zero(u.m_tsLingerExpiration)) { // asynchronous close: - if ((!j->second->core().m_pSndBuffer) || (0 == j->second->core().m_pSndBuffer->getCurrBufSize()) || - (j->second->core().m_tsLingerExpiration <= steady_clock::now())) + if ((!u.m_pSndBuffer) || (0 == u.m_pSndBuffer->getCurrBufSize()) || + (u.m_tsLingerExpiration <= steady_clock::now())) { - HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED qualified @" << j->second->m_SocketID); - j->second->core().m_tsLingerExpiration = steady_clock::time_point(); - j->second->core().m_bClosing = true; - j->second->m_tsClosureTimeStamp = steady_clock::now(); + HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED qualified @" << ps->m_SocketID); + u.m_tsLingerExpiration = steady_clock::time_point(); + u.m_bClosing = true; + ps->m_tsClosureTimeStamp = steady_clock::now(); } } // timeout 1 second to destroy a socket AND it has been removed from // RcvUList const steady_clock::time_point now = steady_clock::now(); - const steady_clock::duration closed_ago = now - j->second->m_tsClosureTimeStamp; + const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp; if (closed_ago > seconds_from(1)) { - CRNode* rnode = j->second->core().m_pRNode; + CRNode* rnode = u.m_pRNode; if (!rnode || !rnode->m_bOnList) { HLOGC(smlog.Debug, - log << "checkBrokenSockets: @" << j->second->m_SocketID << " closed " + log << "checkBrokenSockets: @" << ps->m_SocketID << " closed " << FormatDuration(closed_ago) << " ago and removed from RcvQ - will remove"); // HLOGC(smlog.Debug, log << "will unref socket: " << j->first); diff --git a/srtcore/logging.h b/srtcore/logging.h index 608234eab..c17781c24 100644 --- a/srtcore/logging.h +++ b/srtcore/logging.h @@ -154,7 +154,7 @@ struct SRT_API LogDispatcher LogLevel::type level; static const size_t MAX_PREFIX_SIZE = 32; char prefix[MAX_PREFIX_SIZE+1]; - bool enabled; + srt::sync::atomic enabled; LogConfig* src_config; bool isset(int flg) { return (src_config->flags & flg) != 0; } diff --git a/test/test_env.h b/test/test_env.h index 68ec516b3..e20905351 100644 --- a/test/test_env.h +++ b/test/test_env.h @@ -63,18 +63,26 @@ class TestInit class UniqueSocket { int32_t sock; + std::string lab, f; + int l; public: - UniqueSocket(int32_t s): sock(s) + UniqueSocket(int32_t s, const char* label, const char* file, int line): sock(s) { if (s == -1) throw std::invalid_argument("Invalid socket"); + lab = label; + f = file; + l = line; } - UniqueSocket(): sock(-1) +#define MAKE_UNIQUE_SOCK(name, label, expr) srt::UniqueSocket name (expr, label, __FILE__, __LINE__) + + UniqueSocket(): sock(-1), l(0) { } + void close(); ~UniqueSocket(); operator int32_t() const diff --git a/test/test_epoll.cpp b/test/test_epoll.cpp index 116978c08..a8a88aa7a 100644 --- a/test/test_epoll.cpp +++ b/test/test_epoll.cpp @@ -66,7 +66,7 @@ TEST(CEPoll, WaitEmptyCall) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); ASSERT_NE(client_sock, SRT_ERROR); const int no = 0; @@ -89,7 +89,7 @@ TEST(CEPoll, UWaitEmptyCall) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); ASSERT_NE(client_sock, SRT_ERROR); const int no = 0; @@ -112,7 +112,7 @@ TEST(CEPoll, WaitAllSocketsInEpollReleased) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); ASSERT_NE(client_sock, SRT_ERROR); const int yes = 1; @@ -146,7 +146,7 @@ TEST(CEPoll, WaitAllSocketsInEpollReleased2) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); ASSERT_NE(client_sock, SRT_ERROR); const int yes = 1; @@ -174,7 +174,7 @@ TEST(CEPoll, WrongEpoll_idOnAddUSock) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); ASSERT_NE(client_sock, SRT_ERROR); const int no = 0; @@ -197,7 +197,7 @@ TEST(CEPoll, HandleEpollEvent) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); EXPECT_NE(client_sock, SRT_ERROR); const int yes = 1; @@ -258,7 +258,7 @@ TEST(CEPoll, NotifyConnectionBreak) srt::TestInit srtinit; // 1. Prepare client - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); ASSERT_NE(client_sock, SRT_ERROR); const int yes SRT_ATR_UNUSED = 1; @@ -280,7 +280,7 @@ TEST(CEPoll, NotifyConnectionBreak) ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa_client.sin_addr), 1); // 2. Prepare server - srt::UniqueSocket server_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(server_sock, "server_sock", srt_create_socket()); ASSERT_NE(server_sock, SRT_ERROR); ASSERT_NE(srt_setsockopt(server_sock, 0, SRTO_RCVSYN, &no, sizeof no), SRT_ERROR); // for async connect @@ -372,7 +372,7 @@ TEST(CEPoll, HandleEpollEvent2) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); EXPECT_NE(client_sock, SRT_ERROR); const int yes = 1; @@ -433,7 +433,7 @@ TEST(CEPoll, HandleEpollNoEvent) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); EXPECT_NE(client_sock, SRT_ERROR); const int yes = 1; @@ -483,7 +483,7 @@ TEST(CEPoll, ThreadedUpdate) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); EXPECT_NE(client_sock, SRT_ERROR); const int no = 0; diff --git a/test/test_file_transmission.cpp b/test/test_file_transmission.cpp index 97f9e684a..bfd668ac7 100644 --- a/test/test_file_transmission.cpp +++ b/test/test_file_transmission.cpp @@ -25,12 +25,14 @@ #include #include #include +#include //#pragma comment (lib, "ws2_32.lib") TEST(Transmission, FileUpload) { srt::TestInit srtinit; + srtinit.HandlePerTestOptions(); // Generate the source file // We need a file that will contain more data @@ -93,7 +95,7 @@ TEST(Transmission, FileUpload) // Start listener-receiver thread - bool thread_exit = false; + std::atomic thread_exit { false }; auto client = std::thread([&] { @@ -116,12 +118,17 @@ TEST(Transmission, FileUpload) for (;;) { int n = srt_recv(accepted_sock, buf.data(), 1456); - ASSERT_NE(n, SRT_ERROR); + EXPECT_NE(n, SRT_ERROR) << srt_getlasterror_str(); if (n == 0) { std::cerr << "Received 0 bytes, breaking.\n"; break; } + else if (n == -1) + { + std::cerr << "READ FAILED, breaking anyway\n"; + break; + } // Write to file any amount of data received copyfile.write(buf.data(), n); @@ -173,7 +180,7 @@ TEST(Transmission, FileUpload) std::cout << "Sockets closed, joining receiver thread\n"; client.join(); - std::ifstream tarfile("file.target"); + std::ifstream tarfile("file.target", std::ios::in | std::ios::binary); EXPECT_EQ(!!tarfile, true); tarfile.seekg(0, std::ios::end); @@ -182,8 +189,14 @@ TEST(Transmission, FileUpload) std::cout << "Comparing files\n"; // Compare files - tarfile.seekg(0, std::ios::end); - ifile.seekg(0, std::ios::beg); + + // Theoretically it should work if you just rewind to 0, but + // on Windows this somehow doesn't work. + tarfile.close(); + tarfile.open("file.target", std::ios::in | std::ios::binary); + + ifile.close(); + ifile.open("file.source", std::ios::in | std::ios::binary); for (size_t i = 0; i < tar_size; ++i) { diff --git a/test/test_main.cpp b/test/test_main.cpp index cc5acc487..cb3b7c5c5 100644 --- a/test/test_main.cpp +++ b/test/test_main.cpp @@ -113,6 +113,11 @@ void TestInit::HandlePerTestOptions() { srt_setloglevel(LOG_DEBUG); } + + if (TestEnv::me->OptionPresent("lognote")) + { + srt_setloglevel(LOG_NOTICE); + } } // Copied from ../apps/apputil.cpp, can't really link this file here. @@ -178,7 +183,14 @@ sockaddr_any CreateAddr(const std::string& name, unsigned short port, int pref_f UniqueSocket::~UniqueSocket() { - srt_close(sock); + // Could be closed explicitly + if (sock != -1) + close(); +} + +void UniqueSocket::close() +{ + EXPECT_NE(srt_close(sock), SRT_ERROR) << lab << " CREATED: "<< f << ":" << l; } } diff --git a/test/test_many_connections.cpp b/test/test_many_connections.cpp index 20deccf70..29cd6bdd2 100644 --- a/test/test_many_connections.cpp +++ b/test/test_many_connections.cpp @@ -135,6 +135,8 @@ TEST_F(TestConnection, Multiple) cerr << "Opening " << NSOCK << " connections\n"; + bool overall_test = true; + for (size_t i = 0; i < NSOCK; i++) { m_connections[i] = srt_create_socket(); @@ -145,13 +147,18 @@ TEST_F(TestConnection, Multiple) int conntimeo = 60; srt_setsockflag(m_connections[i], SRTO_CONNTIMEO, &conntimeo, sizeof conntimeo); + SRTSOCKET connres = SRT_INVALID_SOCK; + //cerr << "Connecting #" << i << " to " << sockaddr_any(psa).str() << "...\n"; //cerr << "Connecting to: " << sockaddr_any(psa).str() << endl; - ASSERT_NE(srt_connect(m_connections[i], psa, sizeof lsa), SRT_ERROR); + connres = srt_connect(m_connections[i], psa, sizeof lsa); + EXPECT_NE(connres, SRT_INVALID_SOCK) << "conn #" << i << ": " << srt_getlasterror_str(); + if (connres == SRT_INVALID_SOCK) + overall_test = false; // Set now async sending so that sending isn't blocked int no = 0; - ASSERT_NE(srt_setsockflag(m_connections[i], SRTO_SNDSYN, &no, sizeof no), -1); + EXPECT_NE(srt_setsockflag(m_connections[i], SRTO_SNDSYN, &no, sizeof no), -1); } for (size_t j = 1; j <= 100; j++) @@ -170,6 +177,7 @@ TEST_F(TestConnection, Multiple) EXPECT_FALSE(m_accept_exit) << "AcceptLoop already broken for some reason!"; // Up to this moment the server sock should survive + cerr << "Closing server socket\n"; // Close server socket to break the accept loop EXPECT_EQ(srt_close(m_server_sock), 0); @@ -177,6 +185,8 @@ TEST_F(TestConnection, Multiple) cerr << "Synchronize with the accepting thread\n"; ex.wait(); cerr << "Synchronization done\n"; + + ASSERT_TRUE(overall_test); } diff --git a/test/test_reuseaddr.cpp b/test/test_reuseaddr.cpp index fe9027311..fa92bc635 100644 --- a/test/test_reuseaddr.cpp +++ b/test/test_reuseaddr.cpp @@ -1,5 +1,6 @@ #include #include +#include #ifndef _WIN32 #include #endif @@ -30,7 +31,7 @@ struct AtReturnJoin // iphlp library to be attached to the executable, which is kinda // problematic. Temporarily block tests using this function on Windows. -std::string GetLocalIP(int af = AF_UNSPEC) +static std::string GetLocalIP(int af = AF_UNSPEC) { std::cout << "!!!WARNING!!!: GetLocalIP not supported, test FORCEFULLY passed\n"; return ""; @@ -50,7 +51,7 @@ struct IfAddr } }; -std::string GetLocalIP(int af = AF_UNSPEC) +static std::string GetLocalIP(int af = AF_UNSPEC) { struct ifaddrs * ifa=NULL; void * tmpAddrPtr=NULL; @@ -101,306 +102,364 @@ std::string GetLocalIP(int af = AF_UNSPEC) } #endif -int client_pollid = SRT_ERROR; -SRTSOCKET g_client_sock = SRT_ERROR; - -void clientSocket(std::string ip, int port, bool expect_success) +class ReuseAddr : public srt::Test { - int yes = 1; - int no = 0; - - int family = AF_INET; - std::string famname = "IPv4"; - if (ip.substr(0, 2) == "6.") - { - family = AF_INET6; - ip = ip.substr(2); - famname = "IPv6"; - } + int m_server_pollid = SRT_ERROR; - std::cout << "[T/C] Creating client socket\n"; +protected: - g_client_sock = srt_create_socket(); - ASSERT_NE(g_client_sock, SRT_ERROR); + std::string showEpollContents(const char* label, int* array, int length) + { + std::ostringstream out; + out << label << ":["; + if (length) + { + // Now is at least 1 + out << "@" << array[0]; - ASSERT_NE(srt_setsockopt(g_client_sock, 0, SRTO_SNDSYN, &no, sizeof no), SRT_ERROR); // for async connect - ASSERT_NE(srt_setsockflag(g_client_sock, SRTO_SENDER, &yes, sizeof yes), SRT_ERROR); + for (int i = 1; i < length; ++i) + out << " @" << array[i]; + } + out << "]"; + return out.str(); + } - ASSERT_NE(srt_setsockopt(g_client_sock, 0, SRTO_TSBPDMODE, &yes, sizeof yes), SRT_ERROR); + struct UniquePollid + { + int pollid = SRT_ERROR; + UniquePollid() + { + pollid = srt_epoll_create(); + } - int epoll_out = SRT_EPOLL_OUT; - srt_epoll_add_usock(client_pollid, g_client_sock, &epoll_out); + ~UniquePollid() + { + srt_epoll_release(pollid); + } - sockaddr_any sa = srt::CreateAddr(ip, port, family); + operator int() const + { + return pollid; + } + }; - std::cout << "[T/C] Connecting to: " << sa.str() << " (" << famname << ")" << std::endl; + void clientSocket(SRTSOCKET client_sock, std::string ip, int port, bool expect_success) + { + using namespace std; - int connect_res = srt_connect(g_client_sock, sa.get(), sa.size()); + int yes = 1; + int no = 0; - if (connect_res == -1) - { - std::cout << "srt_connect: " << srt_getlasterror_str() << std::endl; - } + int family = AF_INET; + string famname = "IPv4"; + if (ip.substr(0, 2) == "6.") + { + family = AF_INET6; + ip = ip.substr(2); + famname = "IPv6"; + } - if (expect_success) - { - EXPECT_NE(connect_res, -1); - if (connect_res == -1) - return; + cout << "[T/C] Setting up client socket\n"; + ASSERT_NE(client_sock, SRT_INVALID_SOCK); + ASSERT_EQ(srt_getsockstate(client_sock), SRTS_INIT); - // Socket readiness for connection is checked by polling on WRITE allowed sockets. + EXPECT_NE(srt_setsockflag(client_sock, SRTO_SNDSYN, &no, sizeof no), SRT_ERROR); // for async connect + EXPECT_NE(srt_setsockflag(client_sock, SRTO_SENDER, &yes, sizeof yes), SRT_ERROR); + EXPECT_NE(srt_setsockflag(client_sock, SRTO_TSBPDMODE, &yes, sizeof yes), SRT_ERROR); - if (connect_res != -1) - { - int rlen = 2; - SRTSOCKET read[2]; + UniquePollid client_pollid; + ASSERT_NE(int(client_pollid), SRT_ERROR); - int wlen = 2; - SRTSOCKET write[2]; + int epoll_out = SRT_EPOLL_OUT; + srt_epoll_add_usock(client_pollid, client_sock, &epoll_out); - std::cout << "[T/C] Waiting for connection readiness...\n"; + sockaddr_any sa = srt::CreateAddr(ip, port, family); - EXPECT_NE(srt_epoll_wait(client_pollid, read, &rlen, - write, &wlen, - -1, // -1 is set for debuging purpose. - // in case of production we need to set appropriate value - 0, 0, 0, 0), SRT_ERROR); + cout << "[T/C] Connecting to: " << sa.str() << " (" << famname << ")" << endl; + int connect_res = srt_connect(client_sock, sa.get(), sa.size()); - EXPECT_EQ(rlen, 0); // get exactly one write event without reads - EXPECT_EQ(wlen, 1); // get exactly one write event without reads - EXPECT_EQ(write[0], g_client_sock); // for our client socket + if (connect_res == -1) + { + cout << "srt_connect: " << srt_getlasterror_str() << endl; + } - char buffer[1316] = {1, 2, 3, 4}; - EXPECT_NE(srt_sendmsg(g_client_sock, buffer, sizeof buffer, - -1, // infinit ttl - true // in order must be set to true - ), - SRT_ERROR); + if (expect_success) + { + EXPECT_NE(connect_res, -1); + if (connect_res == -1) + return; + + // Socket readiness for connection is checked by polling on WRITE allowed sockets. + + if (connect_res != -1) + { + int rlen = 2; + SRTSOCKET read[2]; + + int wlen = 2; + SRTSOCKET write[2]; + + cout << "[T/C] Waiting for connection readiness...\n"; + + EXPECT_NE(srt_epoll_wait(client_pollid, read, &rlen, + write, &wlen, + -1, // -1 is set for debuging purpose. + // in case of production we need to set appropriate value + 0, 0, 0, 0), SRT_ERROR) << srt_getlasterror_str(); + + EXPECT_EQ(rlen, 0) << showEpollContents("[T/C] R", read, rlen); // get exactly one write event without reads + EXPECT_EQ(wlen, 1) << showEpollContents("[T/C] W", write, wlen); // get exactly one write event without reads + EXPECT_EQ(write[0], client_sock); // for our client socket + + char buffer[1316] = {1, 2, 3, 4}; + EXPECT_NE(srt_sendmsg(client_sock, buffer, sizeof buffer, + -1, // infinit ttl + true // in order must be set to true + ), + SRT_ERROR); + } + else + { + cout << "[T/C] (NOT TESTING TRANSMISSION - CONNECTION FAILED ALREADY)\n"; + } } else { - std::cout << "[T/C] (NOT TESTING TRANSMISSION - CONNECTION FAILED ALREADY)\n"; + EXPECT_EQ(connect_res, -1); } - } - else - { - EXPECT_EQ(connect_res, -1); + + cout << "[T/C] Client exit\n"; } - std::cout << "[T/C] Client exit\n"; -} + SRTSOCKET prepareServerSocket() + { + SRTSOCKET bindsock = srt_create_socket(); + EXPECT_NE(bindsock, SRT_ERROR); -int server_pollid = SRT_ERROR; + int yes = 1; + int no = 0; -SRTSOCKET prepareSocket() -{ - SRTSOCKET bindsock = srt_create_socket(); - EXPECT_NE(bindsock, SRT_ERROR); + EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_RCVSYN, &no, sizeof no), SRT_ERROR); // for async connect + EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_TSBPDMODE, &yes, sizeof yes), SRT_ERROR); - int yes = 1; - int no = 0; + int epoll_in = SRT_EPOLL_IN; - EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_RCVSYN, &no, sizeof no), SRT_ERROR); // for async connect - EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_TSBPDMODE, &yes, sizeof yes), SRT_ERROR); + std::cout << "[T/S] Listener/binder sock @" << bindsock << " added to m_server_pollid\n"; + srt_epoll_add_usock(m_server_pollid, bindsock, &epoll_in); - int epoll_in = SRT_EPOLL_IN; + return bindsock; + } - std::cout << "[T/S] Listener/binder sock @" << bindsock << " added to server_pollid\n"; - srt_epoll_add_usock(server_pollid, bindsock, &epoll_in); + bool bindSocket(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) + { + sockaddr_any sa = srt::CreateAddr(ip, port, AF_INET); - return bindsock; -} + std::string fam = (sa.family() == AF_INET) ? "IPv4" : "IPv6"; -bool bindSocket(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) -{ - sockaddr_any sa = srt::CreateAddr(ip, port, AF_INET); + std::cout << "[T/S] Bind @" << bindsock << " to: " << sa.str() << " (" << fam << ")" << std::endl; - std::string fam = (sa.family() == AF_INET) ? "IPv4" : "IPv6"; + int bind_res = srt_bind(bindsock, sa.get(), sa.size()); - std::cout << "[T/S] Bind @" << bindsock << " to: " << sa.str() << " (" << fam << ")" << std::endl; + std::cout << "[T/S] ... result " << bind_res << " (expected to " + << (expect_success ? "succeed" : "fail") << ")\n"; - int bind_res = srt_bind(bindsock, sa.get(), sa.size()); + if (!expect_success) + { + std::cout << "[T/S] Binding should fail: " << srt_getlasterror_str() << std::endl; + EXPECT_EQ(bind_res, SRT_ERROR); + return false; + } - std::cout << "[T/S] ... result " << bind_res << " (expected to " - << (expect_success ? "succeed" : "fail") << ")\n"; + EXPECT_NE(bind_res, SRT_ERROR); + return true; + } - if (!expect_success) + bool bindListener(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) { - std::cout << "[T/S] Binding should fail: " << srt_getlasterror_str() << std::endl; - EXPECT_EQ(bind_res, SRT_ERROR); - return false; - } + if (!bindSocket(bindsock, ip, port, expect_success)) + return false; - EXPECT_NE(bind_res, SRT_ERROR); - return true; -} + EXPECT_NE(srt_listen(bindsock, SOMAXCONN), SRT_ERROR); -bool bindListener(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) -{ - if (!bindSocket(bindsock, ip, port, expect_success)) - return false; + return true; + } - EXPECT_NE(srt_listen(bindsock, SOMAXCONN), SRT_ERROR); + SRTSOCKET createListener(std::string ip, int port, bool expect_success) + { + std::cout << "[T/S] serverSocket: creating listener socket\n"; - return true; -} + SRTSOCKET bindsock = prepareServerSocket(); -SRTSOCKET createListener(std::string ip, int port, bool expect_success) -{ - std::cout << "[T/S] serverSocket: creating listener socket\n"; + if (!bindListener(bindsock, ip, port, expect_success)) + return SRT_INVALID_SOCK; - SRTSOCKET bindsock = prepareSocket(); + return bindsock; + } - if (!bindListener(bindsock, ip, port, expect_success)) - return SRT_INVALID_SOCK; + SRTSOCKET createBinder(std::string ip, int port, bool expect_success) + { + std::cout << "[T/S] serverSocket: creating binder socket\n"; - return bindsock; -} + SRTSOCKET bindsock = prepareServerSocket(); -SRTSOCKET createBinder(std::string ip, int port, bool expect_success) -{ - std::cout << "[T/S] serverSocket: creating binder socket\n"; + if (!bindSocket(bindsock, ip, port, expect_success)) + { + srt_close(bindsock); + return SRT_INVALID_SOCK; + } - SRTSOCKET bindsock = prepareSocket(); + return bindsock; + } - if (!bindSocket(bindsock, ip, port, expect_success)) + void testAccept(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) { - srt_close(bindsock); - return SRT_INVALID_SOCK; - } + MAKE_UNIQUE_SOCK(client_sock, "[T/C]connect", srt_create_socket()); - return bindsock; -} + auto run = [this, &client_sock, ip, port, expect_success]() { clientSocket(client_sock, ip, port, expect_success); }; -void testAccept(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) -{ + auto launched = std::async(std::launch::async, run); - auto run = [ip, port, expect_success]() { clientSocket(ip, port, expect_success); }; + AtReturnJoin atreturn_join {launched}; - auto launched = std::async(std::launch::async, run); + { // wait for connection from client + int rlen = 2; + SRTSOCKET read[2]; - AtReturnJoin atreturn_join {launched}; + int wlen = 2; + SRTSOCKET write[2]; - { // wait for connection from client - int rlen = 2; - SRTSOCKET read[2]; + std::cout << "[T/S] Wait 10s on E" << m_server_pollid << " for acceptance on @" << bindsock << " ...\n"; - int wlen = 2; - SRTSOCKET write[2]; + EXPECT_NE(srt_epoll_wait(m_server_pollid, + read, &rlen, + write, &wlen, + 10000, // -1 is set for debuging purpose. + // in case of production we need to set appropriate value + 0, 0, 0, 0), SRT_ERROR) << srt_getlasterror_str(); - std::cout << "[T/S] Wait 10s for acceptance on @" << bindsock << " ...\n"; - ASSERT_NE(srt_epoll_wait(server_pollid, - read, &rlen, - write, &wlen, - 10000, // -1 is set for debuging purpose. - // in case of production we need to set appropriate value - 0, 0, 0, 0), SRT_ERROR ); + EXPECT_EQ(rlen, 1) << showEpollContents("[T/S] R", read, rlen); // get exactly one read event without writes + EXPECT_EQ(wlen, 0) << showEpollContents("[T/S] W", write, wlen); // get exactly one read event without writes + ASSERT_EQ(read[0], bindsock); // read event is for bind socket + } + { + sockaddr_any scl; + MAKE_UNIQUE_SOCK(accepted_sock, "[T/S]accept", srt_accept(bindsock, scl.get(), &scl.len)); - ASSERT_EQ(rlen, 1); // get exactly one read event without writes - ASSERT_EQ(wlen, 0); // get exactly one read event without writes - ASSERT_EQ(read[0], bindsock); // read event is for bind socket - } + if (accepted_sock == -1) + { + std::cout << "srt_accept: " << srt_getlasterror_str() << std::endl; + } + EXPECT_NE(accepted_sock.ref(), SRT_INVALID_SOCK); - sockaddr_any scl; + sockaddr_any showacp = (sockaddr*)&scl; + std::cout << "[T/S] Accepted from: " << showacp.str() << std::endl; - SRTSOCKET accepted_sock = srt_accept(bindsock, scl.get(), &scl.len); - if (accepted_sock == -1) - { - std::cout << "srt_accept: " << srt_getlasterror_str() << std::endl; - } - ASSERT_NE(accepted_sock, SRT_INVALID_SOCK); + int epoll_in = SRT_EPOLL_IN; + srt_epoll_add_usock(m_server_pollid, accepted_sock, &epoll_in); // wait for input - sockaddr_any showacp = (sockaddr*)&scl; - std::cout << "[T/S] Accepted from: " << showacp.str() << std::endl; + char buffer[1316]; + { // wait for 1316 packet from client + int rlen = 2; + SRTSOCKET read[2]; - int epoll_in = SRT_EPOLL_IN; - srt_epoll_add_usock(server_pollid, accepted_sock, &epoll_in); // wait for input + int wlen = 2; + SRTSOCKET write[2]; - char buffer[1316]; - { // wait for 1316 packet from client - int rlen = 2; - SRTSOCKET read[2]; + std::cout << "[T/S] Wait for data reception...\n"; - int wlen = 2; - SRTSOCKET write[2]; + EXPECT_NE(srt_epoll_wait(m_server_pollid, + read, &rlen, + write, &wlen, + -1, // -1 is set for debuging purpose. + // in case of production we need to set appropriate value + 0, 0, 0, 0), SRT_ERROR) << srt_getlasterror_str(); - std::cout << "[T/S] Wait for data reception...\n"; - ASSERT_NE(srt_epoll_wait(server_pollid, - read, &rlen, - write, &wlen, - -1, // -1 is set for debuging purpose. - // in case of production we need to set appropriate value - 0, 0, 0, 0), SRT_ERROR ); + EXPECT_EQ(rlen, 1) << showEpollContents("[T/S] R", read, rlen); // get exactly one read event without writes + EXPECT_EQ(wlen, 0) << showEpollContents("[T/S] W", write, wlen); // get exactly one read event without writes + EXPECT_EQ(read[0], accepted_sock.ref()); // read event is for bind socket + } + char pattern[4] = {1, 2, 3, 4}; - ASSERT_EQ(rlen, 1); // get exactly one read event without writes - ASSERT_EQ(wlen, 0); // get exactly one read event without writes - ASSERT_EQ(read[0], accepted_sock); // read event is for bind socket - } + EXPECT_EQ(srt_recvmsg(accepted_sock, buffer, sizeof buffer), + 1316); - char pattern[4] = {1, 2, 3, 4}; + EXPECT_EQ(memcmp(pattern, buffer, sizeof pattern), 0); - ASSERT_EQ(srt_recvmsg(accepted_sock, buffer, sizeof buffer), - 1316); + // XXX There is a possibility that a broken socket can be closed automatically, + // just the srt_close() call would simply return error in case of nonexistent + // socket. Therefore close them both at once; this problem needs to be fixed + // separately. + // + // The test only intends to send one portion of data from the client, so once + // received, the client has nothing more to do and should exit. + std::cout << "[T/S] closing client socket\n"; + client_sock.close(); + std::cout << "[T/S] closing sockets: ACP:@" << accepted_sock << "...\n"; + } + // client_sock closed through UniqueSocket. + // cannot close client_sock after srt_sendmsg because of issue in api.c:2346 - EXPECT_EQ(memcmp(pattern, buffer, sizeof pattern), 0); + std::cout << "[T/S] joining client async \n"; + launched.get(); + } - std::cout << "[T/S] closing sockets: ACP:@" << accepted_sock << " LSN:@" << bindsock << " CLR:@" << g_client_sock << " ...\n"; - ASSERT_NE(srt_close(accepted_sock), SRT_ERROR); - ASSERT_NE(srt_close(g_client_sock), SRT_ERROR); // cannot close g_client_sock after srt_sendmsg because of issue in api.c:2346 + static void shutdownListener(SRTSOCKET bindsock) + { + // Silently ignore. Usually it should have been checked earlier, + // and an invalid sock might be expected in particular tests. + if (bindsock == SRT_INVALID_SOCK) + return; - std::cout << "[T/S] joining client async...\n"; - launched.get(); -} + int yes = 1; + EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_RCVSYN, &yes, sizeof yes), SRT_ERROR); // for async connect + EXPECT_NE(srt_close(bindsock), SRT_ERROR); -void shutdownListener(SRTSOCKET bindsock) -{ - // Silently ignore. Usually it should have been checked earlier, - // and an invalid sock might be expected in particular tests. - if (bindsock == SRT_INVALID_SOCK) - return; + std::chrono::milliseconds check_period (100); + int credit = 400; // 10 seconds + auto then = std::chrono::steady_clock::now(); - int yes = 1; - EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_RCVSYN, &yes, sizeof yes), SRT_ERROR); // for async connect - EXPECT_NE(srt_close(bindsock), SRT_ERROR); + std::cout << "[T/S] waiting for cleanup of @" << bindsock << " up to 10s" << std::endl; + while (srt_getsockstate(bindsock) != SRTS_NONEXIST) + { + std::this_thread::sleep_for(check_period); + --credit; + if (!credit) + break; + } + auto now = std::chrono::steady_clock::now(); + auto dur = std::chrono::duration_cast(now - then); - std::chrono::milliseconds check_period (100); - int credit = 400; // 10 seconds - auto then = std::chrono::steady_clock::now(); + // Keep as single string because this tends to be mixed from 2 threads. + std::ostringstream sout; + sout << "[T/S] @" << bindsock << " dissolved after " + << (dur.count() / 1000.0) << "s" << std::endl; + std::cout << sout.str() << std::flush; - std::cout << "[T/S] waiting for cleanup of @" << bindsock << " up to 10s" << std::endl; - while (srt_getsockstate(bindsock) != SRTS_NONEXIST) - { - std::this_thread::sleep_for(check_period); - --credit; - if (!credit) - break; + EXPECT_NE(credit, 0); } - auto now = std::chrono::steady_clock::now(); - auto dur = std::chrono::duration_cast(now - then); - - // Keep as single string because this tends to be mixed from 2 threads. - std::ostringstream sout; - sout << "[T/S] @" << bindsock << " dissolved after " - << (dur.count() / 1000.0) << "s" << std::endl; - std::cout << sout.str() << std::flush; - EXPECT_NE(credit, 0); -} +private: -TEST(ReuseAddr, SameAddr1) -{ - srt::TestInit srtinit; + void setup() + { + m_server_pollid = srt_epoll_create(); + ASSERT_NE(m_server_pollid, SRT_ERROR); + } - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); + void teardown() + { + (void)srt_epoll_release(m_server_pollid); + m_server_pollid = SRT_ERROR; + } +}; - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); +TEST_F(ReuseAddr, SameAddr1) +{ SRTSOCKET bindsock_1 = createBinder("127.0.0.1", 5000, true); SRTSOCKET bindsock_2 = createListener("127.0.0.1", 5000, true); @@ -413,24 +472,14 @@ TEST(ReuseAddr, SameAddr1) s1.join(); s2.join(); - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, SameAddr2) +TEST_F(ReuseAddr, SameAddr2) { - srt::TestInit srtinit; std::string localip = GetLocalIP(AF_INET); if (localip == "") return; // DISABLE TEST if this doesn't work. - - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - SRTSOCKET bindsock_1 = createBinder(localip, 5000, true); SRTSOCKET bindsock_2 = createListener(localip, 5000, true); @@ -445,21 +494,11 @@ TEST(ReuseAddr, SameAddr2) testAccept(bindsock_3, localip, 5000, true); shutdownListener(bindsock_3); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, SameAddrV6) +TEST_F(ReuseAddr, SameAddrV6) { SRTST_REQUIRES(IPv6); - srt::TestInit srtinit; - - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); SRTSOCKET bindsock_1 = createBinder("::1", 5000, true); SRTSOCKET bindsock_2 = createListener("::1", 5000, true); @@ -475,26 +514,15 @@ TEST(ReuseAddr, SameAddrV6) testAccept(bindsock_3, "::1", 5000, true); shutdownListener(bindsock_3); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, DiffAddr) +TEST_F(ReuseAddr, DiffAddr) { - srt::TestInit srtinit; std::string localip = GetLocalIP(AF_INET); if (localip == "") return; // DISABLE TEST if this doesn't work. - - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - SRTSOCKET bindsock_1 = createBinder("127.0.0.1", 5000, true); SRTSOCKET bindsock_2 = createListener(localip, 5000, true); @@ -505,14 +533,10 @@ TEST(ReuseAddr, DiffAddr) shutdownListener(bindsock_1); shutdownListener(bindsock_2); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, Wildcard) +TEST_F(ReuseAddr, Wildcard) { - srt::TestInit srtinit; #if defined(_WIN32) || defined(CYGWIN) std::cout << "!!!WARNING!!!: On Windows connection to localhost this way isn't possible.\n" "Forcing test to pass, PLEASE FIX.\n"; @@ -524,14 +548,6 @@ TEST(ReuseAddr, Wildcard) std::string localip = GetLocalIP(AF_INET); if (localip == "") return; // DISABLE TEST if this doesn't work. - - - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - SRTSOCKET bindsock_1 = createListener("0.0.0.0", 5000, true); // Binding a certain address when wildcard is already bound should fail. @@ -541,15 +557,11 @@ TEST(ReuseAddr, Wildcard) shutdownListener(bindsock_1); shutdownListener(bindsock_2); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, Wildcard6) +TEST_F(ReuseAddr, Wildcard6) { SRTST_REQUIRES(IPv6); - srt::TestInit srtinit; #if defined(_WIN32) || defined(CYGWIN) std::cout << "!!!WARNING!!!: On Windows connection to localhost this way isn't possible.\n" "Forcing test to pass, PLEASE FIX.\n"; @@ -567,17 +579,10 @@ TEST(ReuseAddr, Wildcard6) // performed there. std::string localip_v4 = GetLocalIP(AF_INET); - - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - // This must be obligatory set before binding a socket to "::" int strict_ipv6 = 1; - SRTSOCKET bindsock_1 = prepareSocket(); + SRTSOCKET bindsock_1 = prepareServerSocket(); srt_setsockflag(bindsock_1, SRTO_IPV6ONLY, &strict_ipv6, sizeof strict_ipv6); bindListener(bindsock_1, "::", 5000, true); @@ -601,7 +606,7 @@ TEST(ReuseAddr, Wildcard6) strict_ipv6 = 0; - bindsock_1 = prepareSocket(); + bindsock_1 = prepareServerSocket(); srt_setsockflag(bindsock_1, SRTO_IPV6ONLY, &strict_ipv6, sizeof strict_ipv6); bindListener(bindsock_1, "::", 5000, true); @@ -620,33 +625,23 @@ TEST(ReuseAddr, Wildcard6) shutdownListener(bindsock_1); shutdownListener(bindsock_2); shutdownListener(bindsock_3); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, ProtocolVersion6) +TEST_F(ReuseAddr, ProtocolVersion6) { SRTST_REQUIRES(IPv6); - srt::TestInit srtinit; #if defined(_WIN32) || defined(CYGWIN) std::cout << "!!!WARNING!!!: On Windows connection to localhost this way isn't possible.\n" "Forcing test to pass, PLEASE FIX.\n"; return; #endif - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - SRTSOCKET bindsock_1 = createListener("0.0.0.0", 5000, true); // We need a small interception in this one. - // createListener = prepareSocket | bindListener - SRTSOCKET bindsock_2 = prepareSocket(); + // createListener = prepareServerSocket | bindListener + SRTSOCKET bindsock_2 = prepareServerSocket(); { int yes = 1; @@ -659,32 +654,23 @@ TEST(ReuseAddr, ProtocolVersion6) shutdownListener(bindsock_1); shutdownListener(bindsock_2); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, ProtocolVersionFaux6) +TEST_F(ReuseAddr, ProtocolVersionFaux6) { SRTST_REQUIRES(IPv6); - srt::TestInit srtinit; + #if defined(_WIN32) || defined(CYGWIN) std::cout << "!!!WARNING!!!: On Windows connection to localhost this way isn't possible.\n" "Forcing test to pass, PLEASE FIX.\n"; return; #endif - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - SRTSOCKET bindsock_1 = createListener("0.0.0.0", 5000, true); // We need a small interception in this one. - // createListener = prepareSocket | bindListener - SRTSOCKET bindsock_2 = prepareSocket(); + // createListener = prepareServerSocket | bindListener + SRTSOCKET bindsock_2 = prepareServerSocket(); { int no = 0; @@ -696,7 +682,4 @@ TEST(ReuseAddr, ProtocolVersionFaux6) shutdownListener(bindsock_1); shutdownListener(bindsock_2); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } diff --git a/test/test_sync.cpp b/test/test_sync.cpp index 844705ea6..e0454a581 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -587,7 +587,8 @@ TEST(SyncEvent, WaitForNotifyAll) /*****************************************************************************/ void* dummythread(void* param) { - *(bool*)(param) = true; + auto& thread_finished = *(srt::sync::atomic*)param; + thread_finished = true; return nullptr; }