From 59e2c9c538a297ee7357fd49fe2149dd034d5cb1 Mon Sep 17 00:00:00 2001 From: Sektor van Skijlen Date: Tue, 17 Dec 2024 21:36:06 +0100 Subject: [PATCH 1/4] [tests] Added more tests for bonding (#3085). --- test/test_bonding.cpp | 1002 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 970 insertions(+), 32 deletions(-) diff --git a/test/test_bonding.cpp b/test/test_bonding.cpp index 54199b1d6..4ec4a84f5 100644 --- a/test/test_bonding.cpp +++ b/test/test_bonding.cpp @@ -3,15 +3,17 @@ #include #include #include -#include - #include "gtest/gtest.h" #include "test_env.h" #include "srt.h" +#include "udt.h" +#include "common.h" #include "netinet_any.h" #include "socketconfig.h" +#include "apputil.hpp" + TEST(Bonding, SRTConnectGroup) { srt::TestInit srtinit; @@ -25,7 +27,7 @@ TEST(Bonding, SRTConnectGroup) { sa.sin_family = AF_INET; sa.sin_port = htons(4200 + i); - ASSERT_EQ(inet_pton(AF_INET, "192.168.1.237", &sa.sin_addr), 1); + EXPECT_EQ(inet_pton(AF_INET, "192.168.1.237", &sa.sin_addr), 1); const SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(NULL, (struct sockaddr*)&sa, sizeof sa); targets.push_back(gd); @@ -58,7 +60,7 @@ TEST(Bonding, SRTConnectGroup) } } -#define ASSERT_SRT_SUCCESS(callform) ASSERT_NE(callform, -1) << "SRT ERROR: " << srt_getlasterror_str() +#define EXPECT_SRT_SUCCESS(callform) EXPECT_NE(callform, -1) << "SRT ERROR: " << srt_getlasterror_str() void listening_thread(bool should_read) { @@ -66,37 +68,37 @@ void listening_thread(bool should_read) sockaddr_in bind_sa; memset(&bind_sa, 0, sizeof bind_sa); bind_sa.sin_family = AF_INET; - ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &bind_sa.sin_addr), 1); + EXPECT_EQ(inet_pton(AF_INET, "127.0.0.1", &bind_sa.sin_addr), 1); bind_sa.sin_port = htons(4200); - ASSERT_SRT_SUCCESS(srt_bind(server_sock, (sockaddr*)&bind_sa, sizeof bind_sa)); + EXPECT_SRT_SUCCESS(srt_bind(server_sock, (sockaddr*)&bind_sa, sizeof bind_sa)); const int yes = 1; - ASSERT_SRT_SUCCESS(srt_setsockflag(server_sock, SRTO_GROUPCONNECT, &yes, sizeof yes)); + EXPECT_SRT_SUCCESS(srt_setsockflag(server_sock, SRTO_GROUPCONNECT, &yes, sizeof yes)); const int no = 1; - ASSERT_SRT_SUCCESS(srt_setsockflag(server_sock, SRTO_RCVSYN, &no, sizeof no)); + EXPECT_SRT_SUCCESS(srt_setsockflag(server_sock, SRTO_RCVSYN, &no, sizeof no)); const int eid = srt_epoll_create(); const int listen_event = SRT_EPOLL_IN | SRT_EPOLL_ERR; - ASSERT_SRT_SUCCESS(srt_epoll_add_usock(eid, server_sock, &listen_event)); + EXPECT_SRT_SUCCESS(srt_epoll_add_usock(eid, server_sock, &listen_event)); - ASSERT_SRT_SUCCESS(srt_listen(server_sock, 5)); + EXPECT_SRT_SUCCESS(srt_listen(server_sock, 5)); std::cout << "Listen: wait for acceptability\n"; int fds[2]; int fds_len = 2; int ers[2]; int ers_len = 2; - ASSERT_SRT_SUCCESS(srt_epoll_wait(eid, fds, &fds_len, ers, &ers_len, 5000, + EXPECT_SRT_SUCCESS(srt_epoll_wait(eid, fds, &fds_len, ers, &ers_len, 5000, 0, 0, 0, 0)); std::cout << "Listen: reported " << fds_len << " acceptable and " << ers_len << " errors\n"; - ASSERT_GT(fds_len, 0); - ASSERT_EQ(fds[0], server_sock); + EXPECT_GT(fds_len, 0); + EXPECT_EQ(fds[0], server_sock); srt::sockaddr_any scl; int acp = srt_accept(server_sock, (scl.get()), (&scl.len)); - ASSERT_SRT_SUCCESS(acp); - ASSERT_NE(acp & SRTGROUP_MASK, 0); + EXPECT_SRT_SUCCESS(acp); + EXPECT_NE(acp & SRTGROUP_MASK, 0); if (should_read) { @@ -124,11 +126,22 @@ void listening_thread(bool should_read) // srt_accept.. } -void ConnectCallback(void* /*opaq*/, SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token) +SRTSOCKET g_listen_socket = -1; +int g_nconnected = 0; +int g_nfailed = 0; + +// This ConnectCallback is mainly informative, but it also collects the +// number of succeeded and failed links. +void ConnectCallback(void* , SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token) { std::cout << "Connect callback. Socket: " << sock << ", error: " << error << ", token: " << token << '\n'; + + if (error == SRT_SUCCESS) + ++g_nconnected; + else + ++g_nfailed; } TEST(Bonding, NonBlockingGroupConnect) @@ -140,20 +153,20 @@ TEST(Bonding, NonBlockingGroupConnect) std::cout << "Created group socket: " << ss << '\n'; int no = 0; - ASSERT_NE(srt_setsockopt(ss, 0, SRTO_RCVSYN, &no, sizeof no), SRT_ERROR); // non-blocking mode - ASSERT_NE(srt_setsockopt(ss, 0, SRTO_SNDSYN, &no, sizeof no), SRT_ERROR); // non-blocking mode + EXPECT_NE(srt_setsockopt(ss, 0, SRTO_RCVSYN, &no, sizeof no), SRT_ERROR); // non-blocking mode + EXPECT_NE(srt_setsockopt(ss, 0, SRTO_SNDSYN, &no, sizeof no), SRT_ERROR); // non-blocking mode const int poll_id = srt_epoll_create(); // Will use this epoll to wait for srt_accept(...) const int epoll_out = SRT_EPOLL_OUT | SRT_EPOLL_ERR; - ASSERT_NE(srt_epoll_add_usock(poll_id, ss, &epoll_out), SRT_ERROR); + EXPECT_NE(srt_epoll_add_usock(poll_id, ss, &epoll_out), SRT_ERROR); srt_connect_callback(ss, &ConnectCallback, this); sockaddr_in sa; sa.sin_family = AF_INET; sa.sin_port = htons(4200); - ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1); + EXPECT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1); sockaddr_in safail = sa; safail.sin_port = htons(4201); // port where we have no listener @@ -166,14 +179,14 @@ TEST(Bonding, NonBlockingGroupConnect) EXPECT_GT(sockid, 0) << "Socket " << 1; sa.sin_port = htons(4201); // Changing port so that second connect fails std::cout << "Socket created: " << sockid << '\n'; - ASSERT_NE(srt_epoll_add_usock(poll_id, sockid, &epoll_out), SRT_ERROR); + EXPECT_NE(srt_epoll_add_usock(poll_id, sockid, &epoll_out), SRT_ERROR); } { const int sockid = srt_connect(ss, (sockaddr*) &safail, sizeof safail); EXPECT_GT(sockid, 0) << "Socket " << 2; safail.sin_port = htons(4201); // Changing port so that second connect fails std::cout << "Socket created: " << sockid << '\n'; - ASSERT_NE(srt_epoll_add_usock(poll_id, sockid, &epoll_out), SRT_ERROR); + EXPECT_NE(srt_epoll_add_usock(poll_id, sockid, &epoll_out), SRT_ERROR); } std::cout << "Returned from connecting two sockets " << std::endl; @@ -226,8 +239,8 @@ TEST(Bonding, CloseGroupAndSocket) { srt::TestInit srtinit; - const int ss = srt_create_group(SRT_GTYPE_BROADCAST); - ASSERT_NE(ss, SRT_ERROR); + MAKE_UNIQUE_SOCK(ss, "broadcast group", srt_create_group(SRT_GTYPE_BROADCAST)); + std::cout << "Created group socket: " << ss << '\n'; int no = 0; @@ -325,7 +338,7 @@ TEST(Bonding, CloseGroupAndSocket) std::this_thread::sleep_for(std::chrono::milliseconds(300)); - EXPECT_EQ(srt_close(ss), 0) << "srt_close: %s\n" << srt_getlasterror_str(); + ss.close(); std::cout << "CLOSED GROUP. Now waiting for sender to exit...\n"; sender.join(); @@ -422,10 +435,10 @@ TEST(Bonding, Options) EXPECT_NE(srt_setsockflag(lsn, SRTO_PASSPHRASE, pass.c_str(), pass.size()), SRT_ERROR); #endif int allow = 1; - ASSERT_NE(srt_setsockflag(lsn, SRTO_GROUPCONNECT, &allow, sizeof allow), SRT_ERROR); - sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET); - ASSERT_NE(srt_bind(lsn, sa.get(), sa.size()), SRT_ERROR); - ASSERT_NE(srt_listen(lsn, 1), SRT_ERROR); + EXPECT_NE(srt_setsockflag(lsn, SRTO_GROUPCONNECT, &allow, sizeof allow), SRT_ERROR); + sockaddr_any sa = srt::CreateAddr("127.0.0.1", 5555, AF_INET); + EXPECT_NE(srt_bind(lsn, sa.get(), sa.size()), SRT_ERROR); + EXPECT_NE(srt_listen(lsn, 1), SRT_ERROR); started = true; // First wait - until it's let go with accepting @@ -433,7 +446,7 @@ TEST(Bonding, Options) sockaddr_any revsa; SRTSOCKET gs = srt_accept(lsn, revsa.get(), &revsa.len); - ASSERT_NE(gs, SRT_ERROR); + EXPECT_NE(gs, SRT_ERROR); check_streamid(gs); @@ -462,7 +475,7 @@ TEST(Bonding, Options) } // Now the thread is accepting, so we call the connect. - sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET); + sockaddr_any sa = srt::CreateAddr("127.0.0.1", 5555, AF_INET); SRTSOCKET member = srt_connect(grp, sa.get(), sa.size()); // We've released the mutex and signaled the CV, so accept should proceed now. @@ -556,7 +569,7 @@ TEST(Bonding, InitialFailure) int allow = 1; ASSERT_NE(srt_setsockflag(lsn, SRTO_GROUPCONNECT, &allow, sizeof allow), SRT_ERROR); - sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET); + sockaddr_any sa = srt::CreateAddr("127.0.0.1", 5555, AF_INET); ASSERT_NE(srt_bind(lsn, sa.get(), sa.size()), SRT_ERROR); ASSERT_NE(srt_listen(lsn, 5), SRT_ERROR); @@ -613,3 +626,928 @@ TEST(Bonding, InitialFailure) srt_close(lsn); } + + +// General idea: +// This should try to connect to two nonexistent links, +// the connecting function (working in blocking mode) +// should exit with error, after the group has been closed +// in a separate thread. +// +// Steps: +// 1. Create group +// 2. Use a nonexistent endpoints 192.168.1.237:4200 and *:4201 +// 3. Close the group in a thread +// 4. Wait for error +TEST(Bonding, ConnectBlind) +{ + using namespace srt; + + TestInit srtinit; + MAKE_UNIQUE_SOCK(ss, "GrpCaller", srt_create_group(SRT_GTYPE_BROADCAST)); + + std::vector targets; + for (int i = 0; i < 2; ++i) + { + sockaddr_any sa = srt::CreateAddr("192.168.1.237", 4200 + i, AF_INET); + const SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(NULL, sa.get(), sa.size()); + targets.push_back(gd); + } + + std::future closing_promise = std::async(std::launch::async, [](int s) { + std::this_thread::sleep_for(std::chrono::seconds(2)); + std::cerr << "Closing group" << std::endl; + srt_close(s); + }, ss); + + std::cout << "srt_connect_group calling " << std::endl; + const int st = srt_connect_group(ss, targets.data(), targets.size()); + std::cout << "srt_connect_group returned " << st << std::endl; + + closing_promise.wait(); + EXPECT_EQ(st, -1); + + // Delete config objects before prospective exception + for (auto& gd: targets) + srt_delete_config(gd.config); + + int res = srt_close(ss); + if (res == SRT_ERROR) + { + std::cerr << "srt_close: " << srt_getlasterror_str() << std::endl; + } +} + +// TEST IDEA: +// This uses srt_connect_group in non-blocking mode. The listener +// is also created to respond to the connection. Expected is to +// continue the connecting in background and report a success, +// and report the epoll IN on listener for the first connection, +// and UPDATE For the second one. +// +// TEST STEPS: +// 1. Create a listener socket and a group. +// 2. Set the group and the listener socket non-blocking mode +// 3. Start the accepting thread +// - wait for IN event ready on the listener socket +// - accept a connection +// - wait for UPDATE event ready on the listener socket +// - wait for any event up to 5s (possibly ERR) +// - close the listener socket +// 4. Prepare two connections and start connecting +// 5. Wait for the OUT readiness event on the group +// 6. Close the group. +// 7. Join the thread +TEST(Bonding, ConnectNonBlocking) +{ + using namespace std; + using namespace std::chrono; + using namespace srt; + + TestInit srtinit; + + const string ADDR = "127.0.0.1"; + const int PORT = 4209; + + // NOTE: Add more group types, if implemented! + vector types { SRT_GTYPE_BROADCAST, SRT_GTYPE_BACKUP }; + + for (const auto GTYPE: types) + { + g_listen_socket = srt_create_socket(); + sockaddr_in bind_sa; + memset(&bind_sa, 0, sizeof bind_sa); + bind_sa.sin_family = AF_INET; + EXPECT_EQ(inet_pton(AF_INET, ADDR.c_str(), &bind_sa.sin_addr), 1); + bind_sa.sin_port = htons(PORT); + + EXPECT_NE(srt_bind(g_listen_socket, (sockaddr*)&bind_sa, sizeof bind_sa), -1); + const int yes = 1; + srt_setsockflag(g_listen_socket, SRTO_GROUPCONNECT, &yes, sizeof yes); + EXPECT_NE(srt_listen(g_listen_socket, 5), -1); + + int lsn_eid = srt_epoll_create(); + int lsn_events = SRT_EPOLL_IN | SRT_EPOLL_ERR | SRT_EPOLL_UPDATE; + srt_epoll_add_usock(lsn_eid, g_listen_socket, &lsn_events); + + // Caller part + + const int ss = srt_create_group(GTYPE); + EXPECT_NE(ss, SRT_ERROR); + std::cout << "Created group socket: " << ss << '\n'; + + int no = 0; + EXPECT_NE(srt_setsockopt(ss, 0, SRTO_RCVSYN, &no, sizeof no), SRT_ERROR); // non-blocking mode + EXPECT_NE(srt_setsockopt(ss, 0, SRTO_SNDSYN, &no, sizeof no), SRT_ERROR); // non-blocking mode + + const int poll_id = srt_epoll_create(); + // Will use this epoll to wait for srt_accept(...) + const int epoll_out = SRT_EPOLL_OUT | SRT_EPOLL_ERR; + EXPECT_NE(srt_epoll_add_usock(poll_id, ss, &epoll_out), SRT_ERROR); + + srt_connect_callback(ss, &ConnectCallback, this); + + sockaddr_in sa; + sa.sin_family = AF_INET; + sa.sin_port = htons(PORT); + EXPECT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1); + + //srt_setloglevel(LOG_DEBUG); + + auto acthr = std::thread([&lsn_eid]() { + SRT_EPOLL_EVENT ev[3]; + + ThreadName::set("TEST_A"); + + cout << "[A] Waiting for accept\n"; + + // This can wait in infinity; worst case it will be killed in process. + int uwait_res = srt_epoll_uwait(lsn_eid, ev, 3, -1); + EXPECT_EQ(uwait_res, 1); + EXPECT_EQ(ev[0].fd, g_listen_socket); + + // Check if the IN event is set, even if it's not the only event + const int ev_in_bit = SRT_EPOLL_IN; + EXPECT_NE(ev[0].events & ev_in_bit, 0); + bool have_also_update = ev[0].events & SRT_EPOLL_UPDATE; + + sockaddr_any adr; + int accept_id = srt_accept(g_listen_socket, adr.get(), &adr.len); + + // Expected: group reporting + EXPECT_NE(accept_id & SRTGROUP_MASK, 0); + + if (have_also_update) + { + cout << "[A] NOT waiting for update - already reported previously\n"; + } + else + { + cout << "[A] Waiting for update\n"; + // Now another waiting is required and expected the update event + // Wait up to 5s to avoid hangup in case of error + uwait_res = srt_epoll_uwait(lsn_eid, ev, 3, 5000); + EXPECT_EQ(uwait_res, 1); + EXPECT_EQ(ev[0].fd, g_listen_socket); + EXPECT_EQ(ev[0].events, (int)SRT_EPOLL_UPDATE); + } + + cout << "[A] Waitig for close (up to 5s)\n"; + // Wait up to 5s for an error + srt_epoll_uwait(lsn_eid, ev, 3, 5000); + + srt_close(accept_id); + cout << "[A] thread finished\n"; + }); + + cout << "Connecting two sockets\n"; + + SRT_SOCKGROUPCONFIG cc[2]; + cc[0] = srt_prepare_endpoint(NULL, (sockaddr*)&sa, sizeof sa); + cc[1] = srt_prepare_endpoint(NULL, (sockaddr*)&sa, sizeof sa); + + EXPECT_NE(srt_epoll_add_usock(poll_id, ss, &epoll_out), SRT_ERROR); + + int result = srt_connect_group(ss, cc, 2); + EXPECT_NE(result, -1); + char data[4] = { 1, 2, 3, 4}; + cout << "Sending...\n"; + int wrong_send = srt_send(ss, data, sizeof data); + cout << "Getting error...\n"; + int errorcode = srt_getlasterror(NULL); + EXPECT_EQ(wrong_send, -1); + EXPECT_EQ(errorcode, SRT_EASYNCSND) << "REAL ERROR: " << srt_getlasterror_str(); + + // Wait up to 2s + SRT_EPOLL_EVENT ev[3]; + const int uwait_result = srt_epoll_uwait(poll_id, ev, 3, 2000); + std::cout << "Returned from connecting two sockets " << std::endl; + + EXPECT_EQ(uwait_result, 1); // Expect the group reported + EXPECT_EQ(ev[0].fd, ss); + + // One second to make sure that both links are connected. + this_thread::sleep_for(seconds(1)); + + EXPECT_EQ(srt_close(ss), 0); + acthr.join(); + + srt_epoll_release(lsn_eid); + srt_epoll_release(poll_id); + + srt_close(g_listen_socket); + } + +} + +// TEST IDEA: +// In this test there is created a working listener socket to +// accept the connection, and we use a Backup-type group with +// two links, but different weights. We connect them both and +// make sure that both are ready for use. Then we send a packet +// over the group and see, which link got activated and which +// remained idle. Expected is to have the link with higher +// priority (greater weight) to be activated. +// +// TEST STEPS: +// 1. Create a listener socket and a group. +// 3. Start the accepting thread +// - accept a connection +// - read one packet from the accepted entity +// - close the listener socket +// 4. Prepare two connections (one with weight=1) and connect the group +// 5. Wait for having all links connected +// 6. Send one packet and check which link was activated +// 6. Close the group. +// 7. Join the thread +TEST(Bonding, BackupPriorityBegin) +{ + using namespace std; + using namespace std::chrono; + using namespace srt; + + TestInit srtinit; + + g_nconnected = 0; + g_nfailed = 0; + + g_listen_socket = srt_create_socket(); + ASSERT_NE(g_listen_socket, SRT_INVALID_SOCK); + sockaddr_in bind_sa; + memset(&bind_sa, 0, sizeof bind_sa); + bind_sa.sin_family = AF_INET; + EXPECT_EQ(inet_pton(AF_INET, "127.0.0.1", &bind_sa.sin_addr), 1); + bind_sa.sin_port = htons(4200); + + EXPECT_NE(srt_bind(g_listen_socket, (sockaddr*)&bind_sa, sizeof bind_sa), -1); + const int yes = 1; + srt_setsockflag(g_listen_socket, SRTO_GROUPCONNECT, &yes, sizeof yes); + EXPECT_NE(srt_listen(g_listen_socket, 5), -1); + + // Caller part + + const int ss = srt_create_group(SRT_GTYPE_BACKUP); + EXPECT_NE(ss, SRT_ERROR); + + srt_connect_callback(ss, &ConnectCallback, this); + + sockaddr_in sa; + sa.sin_family = AF_INET; + sa.sin_port = htons(4200); + EXPECT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1); + + auto acthr = std::thread([]() { + sockaddr_any adr; + cout << "[A] Accepting a connection...\n"; + int accept_id = srt_accept(g_listen_socket, adr.get(), &adr.len); + + // Expected: group reporting + EXPECT_NE(accept_id & SRTGROUP_MASK, 0); + + SRT_SOCKGROUPDATA gdata[2]; + SRT_MSGCTRL mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 2; + long long data[1320/8]; + + cout << "[A] Receiving...\n"; + int ds = srt_recvmsg2(accept_id, (char*)data, sizeof data, (&mc)); + EXPECT_EQ(ds, 8); + + cout << "[A] Closing\n"; + srt_close(accept_id); + cout << "[A] thread finished\n"; + }); + + cout << "Connecting two sockets\n"; + + SRT_SOCKGROUPCONFIG cc[2]; + cc[0] = srt_prepare_endpoint(NULL, (sockaddr*)&sa, sizeof sa); + cc[0].token = 0; + cc[1] = srt_prepare_endpoint(NULL, (sockaddr*)&sa, sizeof sa); + cc[1].token = 1; + cc[1].weight = 1; // higher than the default 0 + + int result = srt_connect_group(ss, cc, 2); + EXPECT_GT(result, 0); // blocking mode, first connection = returns Socket ID + + // Make sure both links are connected + SRT_SOCKGROUPDATA gdata[2]; + size_t psize = 2; + size_t nwait = 10; + cout << "Waiting for getting 2 links:\n"; + while (--nwait) + { + srt_group_data(ss, gdata, &psize); + if (psize == 2) + { + int l1, l2; + l1 = gdata[0].memberstate; + l2 = gdata[1].memberstate; + + if (l1 > SRT_GST_PENDING && l2 > SRT_GST_PENDING) + { + cout << "Both up: [0]=" << l1 << " [1]=" << l2 << "\n"; + break; + } + else + { + cout << "Still link states [0]=" << l1 << " [1]=" << l2 << "\n"; + } + } + else + { + cout << "Still " << psize << endl; + } + this_thread::sleep_for(milliseconds(500)); + } + EXPECT_NE(nwait, size_t()); + + // Now send one packet + long long data = 0x1234123412341234; + + SRT_MSGCTRL mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 2; + + // This call should retrieve the group information + // AFTER the transition has happened + int sendret = srt_sendmsg2(ss, (char*)&data, sizeof data, (&mc)); + EXPECT_EQ(sendret, int(sizeof data)); + + // So, let's check which link is in RUNNING state + // TOKEN value is the index in cc array, and we should + // also have the weight value there. + + SRT_SOCKGROUPDATA* mane, * backup; + if (gdata[0].weight == 0) + { + backup = &gdata[0]; + mane = &gdata[1]; + } + else + { + mane = &gdata[0]; + backup = &gdata[1]; + } + + cout << "MAIN:[" << mane->token << "] weight=" << mane->weight << endl; + cout << "BACKUP:[" << backup->token << "] weight=" << backup->weight << endl; + + // Ok, now mane link should be active, backup idle + EXPECT_EQ(mane->memberstate, SRT_GST_RUNNING); + EXPECT_EQ(backup->memberstate, SRT_GST_IDLE); + + acthr.join(); +} + + +// TEST IDEA: +// In this test there is created a working listener socket to +// accept the connection, and we use a Backup-type group with +// two links, but different weights. We connect the first link +// with less weight and send one packet to make sure this only +// link was activated. Then we connect a second link with weight=1. +// Then we send the packet again and see if the new link was +// immediately activated. The first link should be silenced after +// time, but there's no possibility to check this in such a +// simple test. +// +// TEST STEPS: +// 1. Create a listener socket and a group. +// 3. Start the accepting thread +// - accept a connection +// - read one packet from the accepted entity +// - read the second packet from the accepted entity +// - close the listener socket +// 4. Prepare one connection with weight=0 and connect the group +// 5. Send a packet to enforce activation of one link +// 6. Prepare another connection with weight=1 and connect the group +// 7. Send a packet +// 8. Check member status - both links should be running. +// 9. Close the group. +// 10. Join the thread +TEST(Bonding, BackupPriorityTakeover) +{ + using namespace std; + using namespace std::chrono; + using namespace srt; + + TestInit srtinit; + + g_nconnected = 0; + g_nfailed = 0; + + g_listen_socket = srt_create_socket(); + ASSERT_NE(g_listen_socket, SRT_INVALID_SOCK); + sockaddr_in bind_sa; + memset(&bind_sa, 0, sizeof bind_sa); + bind_sa.sin_family = AF_INET; + EXPECT_EQ(inet_pton(AF_INET, "127.0.0.1", &bind_sa.sin_addr), 1); + bind_sa.sin_port = htons(4200); + + EXPECT_NE(srt_bind(g_listen_socket, (sockaddr*)&bind_sa, sizeof bind_sa), -1); + const int yes = 1; + srt_setsockflag(g_listen_socket, SRTO_GROUPCONNECT, &yes, sizeof yes); + EXPECT_NE(srt_listen(g_listen_socket, 5), -1); + + // Caller part + + const int ss = srt_create_group(SRT_GTYPE_BACKUP); + EXPECT_NE(ss, SRT_ERROR); + + srt_connect_callback(ss, &ConnectCallback, this); + + sockaddr_in sa; + sa.sin_family = AF_INET; + sa.sin_port = htons(4200); + EXPECT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1); + + auto acthr = std::thread([]() { + sockaddr_any adr; + cout << "[A] Accepting a connection...\n"; + int accept_id = srt_accept(g_listen_socket, adr.get(), &adr.len); + + // Expected: group reporting + EXPECT_NE(accept_id & SRTGROUP_MASK, 0); + + SRT_SOCKGROUPDATA gdata[2]; + SRT_MSGCTRL mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 2; + long long data[1320/8]; + + cout << "[A] Receiving 1...\n"; + int ds = srt_recvmsg2(accept_id, (char*)data, sizeof data, (&mc)); + EXPECT_EQ(ds, 8); + + cout << "[A] Receiving 2...\n"; + ds = srt_recvmsg2(accept_id, (char*)data, sizeof data, (&mc)); + EXPECT_EQ(ds, 8); + + // To make it possible that the state is checked before it is closed. + this_thread::sleep_for(seconds(1)); + + cout << "[A] Closing\n"; + srt_close(accept_id); + cout << "[A] thread finished\n"; + }); + + cout << "Connecting first link weight=0:\n"; + + SRT_SOCKGROUPCONFIG cc[2]; + cc[0] = srt_prepare_endpoint(NULL, (sockaddr*)&sa, sizeof sa); + cc[0].token = 0; + + int result = srt_connect_group(ss, cc, 1); + EXPECT_GT(result, 0); // connect with only one element returns socket ID + + // As we have one link, after `srt_connect_group` returns, we have + // this link now connected. Send one data portion. + + SRT_SOCKGROUPDATA gdata[2]; + + long long data = 0x1234123412341234; + SRT_MSGCTRL mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 2; + + cout << "Sending (1)\n"; + // This call should retrieve the group information + // AFTER the transition has happened + int sendret = srt_sendmsg2(ss, (char*)&data, sizeof data, (&mc)); + EXPECT_EQ(sendret, int(sizeof data)); + EXPECT_EQ(mc.grpdata_size, size_t(1)); + EXPECT_EQ(gdata[0].memberstate, SRT_GST_RUNNING); + + cout << "Connecting second link weight=1:\n"; + // Now prepare the second connection + cc[0].token = 1; + cc[0].weight = 1; // higher than the default 0 + result = srt_connect_group(ss, cc, 1); + EXPECT_GT(result, 0); // connect with only one element returns socket ID + + // Make sure both links are connected + size_t psize = 2; + size_t nwait = 10; + cout << "Waiting for getting 2 links:\n"; + while (--nwait) + { + srt_group_data(ss, gdata, &psize); + if (psize == 2) + { + int l1, l2; + l1 = gdata[0].memberstate; + l2 = gdata[1].memberstate; + + if (l1 > SRT_GST_PENDING && l2 > SRT_GST_PENDING) + { + cout << "Both up: [0]=" << l1 << " [1]=" << l2 << "\n"; + break; + } + else + { + cout << "Still link states [0]=" << l1 << " [1]=" << l2 << "\n"; + } + } + else + { + cout << "Still " << psize << endl; + } + this_thread::sleep_for(milliseconds(500)); + } + EXPECT_NE(nwait, size_t(0)); + + // Now send one packet (again) + mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 2; + + cout << "Sending (2)\n"; + // This call should retrieve the group information + // AFTER the transition has happened + sendret = srt_sendmsg2(ss, (char*)&data, sizeof data, (&mc)); + EXPECT_EQ(sendret, int(sizeof data)); + + // So, let's check which link is in RUNNING state + // TOKEN value is the index in cc array, and we should + // also have the weight value there. + + SRT_SOCKGROUPDATA* mane, * backup; + if (gdata[0].weight == 0) + { + backup = &gdata[0]; + mane = &gdata[1]; + } + else + { + mane = &gdata[0]; + backup = &gdata[1]; + } + + cout << "MAIN:[" << mane->token << "] weight=" << mane->weight << endl; + cout << "BACKUP:[" << backup->token << "] weight=" << backup->weight << endl; + + // Ok, now both links should be running (this state lasts + // for the "temporary activation" period. + EXPECT_EQ(mane->memberstate, SRT_GST_RUNNING); + EXPECT_EQ(backup->memberstate, SRT_GST_RUNNING); + + acthr.join(); +} + + +// TEST IDEA: +// In this test there is created a working listener socket to +// accept the connection, and we use a Backup-type group with +// two links, but different weights. We connect then two links +// both with weight=1. Then we send a packet to make sure that +// exactly one of them got activated. Then we connect another +// link with weight=0. Then we send a packet again, which should +// not change the link usage. Then we check which link was +// active so far, and we close the socket for that link to make +// it broken, then we wait for having only two links connected. +// Then a packet is sent to activate a link. We expect the link +// with higher weight is activated. +// +// TEST STEPS: +// 1. Create a listener socket. +// 2. Create and setup a group. +// 3. Start the accepting thread +// A1. accept a connection +// A2. read one packet from the accepted entity +// A3. read the second packet from the accepted entity +// A4. read the third packet from the accepted entity +// A5. close the listener socket +// 4. Prepare two connections with weight=1 and connect the group +// 5. Send a packet to enforce activation of one link +// 6. Prepare another connection with weight=0 and connect the group +// 7. Wait for having all 3 links connected. +// 8. Send a packet +// 9. Find which link is currently active and close it +// 10. Wait for having only two links. +// 11. Send a packet. +// 12. Find one link active and one idle +// 13. Check if the link with weight=1 is active and the one with weight=0 is idle. +// 14. Close the group. +// 15. Join the thread +TEST(Bonding, BackupPrioritySelection) +{ + using namespace std; + using namespace std::chrono; + using namespace srt; + + TestInit srtinit; + + g_nconnected = 0; + g_nfailed = 0; + volatile bool recvd = false; + + // 1. + sockaddr_in bind_sa; + memset(&bind_sa, 0, sizeof bind_sa); + bind_sa.sin_family = AF_INET; + ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &bind_sa.sin_addr), 1); + bind_sa.sin_port = htons(4200); + + g_listen_socket = srt_create_socket(); + ASSERT_NE(g_listen_socket, SRT_INVALID_SOCK); + EXPECT_NE(srt_bind(g_listen_socket, (sockaddr*)&bind_sa, sizeof bind_sa), -1); + const int yes = 1; + srt_setsockflag(g_listen_socket, SRTO_GROUPCONNECT, &yes, sizeof yes); + EXPECT_NE(srt_listen(g_listen_socket, 5), -1); + + // Caller part + // 2. + const int ss = srt_create_group(SRT_GTYPE_BACKUP); + EXPECT_NE(ss, SRT_ERROR); + + srt_connect_callback(ss, &ConnectCallback, this); + + // Set the group's stability timeout to 1s, otherwise it will + // declare the links unstable by not receiving ACKs. + int stabtimeo = 1000; + srt_setsockflag(ss, SRTO_GROUPMINSTABLETIMEO, &stabtimeo, sizeof stabtimeo); + + //srt_setloglevel(LOG_DEBUG); + srt::resetlogfa( std::set { + SRT_LOGFA_GRP_SEND, + SRT_LOGFA_GRP_MGMT, + SRT_LOGFA_CONN + }); + + sockaddr_in sa; + sa.sin_family = AF_INET; + sa.sin_port = htons(4200); + EXPECT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1); + + // 3. + auto acthr = std::thread([&recvd]() { + sockaddr_any adr; + cout << "[A1] Accepting a connection...\n"; + + // A1 + int accept_id = srt_accept(g_listen_socket, adr.get(), &adr.len); + + // Expected: group reporting + EXPECT_NE(accept_id & SRTGROUP_MASK, 0); + + SRT_SOCKGROUPDATA gdata[2]; + SRT_MSGCTRL mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 2; + long long data[1320/8]; + + // A2 + cout << "[A2] Receiving 1...\n"; + int ds = srt_recvmsg2(accept_id, (char*)data, sizeof data, (&mc)); + if (ds == -1) { cout << "[A2] ERROR: " << srt_getlasterror(NULL) << " " << srt_getlasterror_str() << endl; } + EXPECT_EQ(ds, 8); + + // A3 + cout << "[A3] Receiving 2...\n"; + ds = srt_recvmsg2(accept_id, (char*)data, sizeof data, (&mc)); + if (ds == -1) { cout << "[A3] ERROR: " << srt_getlasterror(NULL) << " " << srt_getlasterror_str() << endl; } + EXPECT_EQ(ds, 8); + recvd = true; + + // A4 + cout << "[A4] Receiving 3...\n"; + ds = srt_recvmsg2(accept_id, (char*)data, sizeof data, (&mc)); + if (ds == -1) { cout << "[A4] ERROR: " << srt_getlasterror(NULL) << " " << srt_getlasterror_str() << endl; } + EXPECT_EQ(ds, 8); + + cout << "[A] Waiting 5s...\n"; + // To make it possible that the state is checked before it is closed. + this_thread::sleep_for(seconds(5)); + + // A5 + cout << "[A5] Closing\n"; + srt_close(accept_id); + cout << "[A] thread finished\n"; + }); + + + cout << "(4) Connecting first 2 links weight=1:\n"; + + SRT_SOCKGROUPCONFIG cc[2]; + cc[0] = srt_prepare_endpoint(NULL, (sockaddr*)&sa, sizeof sa); + cc[0].token = 0; + cc[0].weight = 1; + cc[1] = srt_prepare_endpoint(NULL, (sockaddr*)&sa, sizeof sa); + cc[1].token = 1; + cc[1].weight = 1; + + // 4. + int result = srt_connect_group(ss, cc, 2); + EXPECT_GT(result, 0); // BLOCKING MODE, always returns the socket value + + // As we have one link, after `srt_connect_group` returns, we have + // this link now connected. Send one data portion. + + SRT_SOCKGROUPDATA gdata[3]; + memset(gdata, 0, sizeof(gdata)); + + long long data = 0x1234123412341234; + SRT_MSGCTRL mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 3; + + // We can send now. We know that we have at least one + // link connected and it already has the same priority + // as the other. + + //srt_setloglevel(LOG_DEBUG); + // 5. + cout << "(5) Sending (1)\n"; + // This call should retrieve the group information + // AFTER the transition has happened + int sendret = srt_sendmsg2(ss, (char*)&data, sizeof data, (&mc)); + // In case when this was an error, display the code + if (sendret == -1) { cout << "[A4] ERROR: " << srt_getlasterror(NULL) << " " << srt_getlasterror_str() << endl; } + + EXPECT_EQ(sendret, int(sizeof data)); + + EXPECT_EQ(mc.grpdata_size, size_t(2)); + + int state0 = gdata[0].memberstate; + int state1 = gdata[1].memberstate; + + cout << "States: [0]=" << state0 << " [1]=" << state1 << endl; + EXPECT_TRUE(state0 == SRT_GST_RUNNING || state1 == SRT_GST_RUNNING); + + // 6. + cout << "(6) Connecting third link weight=0:\n"; + // Now prepare the third connection + cc[0] = srt_prepare_endpoint(NULL, (sockaddr*)&sa, sizeof sa); + cc[0].token = 2; + cc[0].weight = 0; // higher than the default 0 + result = srt_connect_group(ss, cc, 1); + EXPECT_GE(result, 0); // ONE connection only - will return socket id + + // Make sure all 3 links are connected + size_t psize = 3; + size_t nwait = 10; + set states; + + // 7. + cout << "(7) Waiting for getting 3 links:\n"; + while (--nwait) + { + srt_group_data(ss, gdata, &psize); + if (psize == 3) + { + states.clear(); + for (int i = 0; i < 3; ++i) + states.insert(gdata[i].memberstate); + + if (states.count(SRT_GST_PENDING)) + { + cout << "Still not all links...\n"; + } + else + { + cout << "All links up\n"; + break; + } + } + else + { + cout << "Still " << psize << endl; + } + this_thread::sleep_for(milliseconds(500)); + } + EXPECT_NE(nwait, size_t(0)); + + // Now send one packet (again) + mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 3; + + // 8. + cout << "(8) Sending (2)\n"; + // This call should retrieve the group information + // AFTER the transition has happened + sendret = srt_sendmsg2(ss, (char*)&data, sizeof data, (&mc)); + EXPECT_EQ(sendret, int(sizeof data)); + EXPECT_EQ(mc.grpdata_size, size_t(3)); + + // So, let's check which link is in RUNNING state + // TOKEN value is the index in cc array, and we should + // also have the weight value there. + + SRT_SOCKGROUPDATA* mane = nullptr; + + for (size_t i = 0; i < mc.grpdata_size; ++i) + { + if (gdata[i].memberstate == SRT_GST_RUNNING) + { + mane = &gdata[i]; + break; + } + } + + EXPECT_NE(mane, nullptr); + EXPECT_EQ(mane->weight, 1); + + // Spin-wait for making sure the reception succeeded before + // closing. This shouldn't be a problem in general, but + int ntry = 100; + while (!recvd && --ntry) + this_thread::sleep_for(milliseconds(200)); + EXPECT_NE(ntry, 0); + + cout << "(9) Found activated link: [" << mane->token << "] - closing after 0.5s...\n"; + + // Waiting is to make sure that the listener thread has received packet 3. + this_thread::sleep_for(milliseconds(500)); + EXPECT_NE(srt_close(mane->id), -1); + + // Now expect to have only 2 links, wait for it if needed. + psize = 2; + nwait = 10; + + cout << "(10) Waiting for ONLY 2 links:\n"; + while (--nwait) + { + srt_group_data(ss, gdata, &psize); + if (psize == 2) + { + break; + } + else + { + cout << "Still " << psize << endl; + } + this_thread::sleep_for(milliseconds(500)); + } + EXPECT_NE(nwait, size_t(0)); + + // Now send one packet (again) + mc = srt_msgctrl_default; + mc.grpdata = gdata; + mc.grpdata_size = 2; + + cout << "(11) Sending (3)\n"; + // This call should retrieve the group information + // AFTER the transition has happened + sendret = srt_sendmsg2(ss, (char*)&data, sizeof data, (&mc)); + EXPECT_EQ(sendret, int(sizeof data)); + + cout << "(sleep)\n"; + this_thread::sleep_for(seconds(1)); + + mane = nullptr; + SRT_SOCKGROUPDATA* backup = nullptr; + cout << "(12) Checking main/backup:"; + int repeat_check = 1; // 50; +CheckLinksAgain: + for (size_t i = 0; i < mc.grpdata_size; ++i) + { + cout << "[" << i << "]" << srt_logging::MemberStatusStr(gdata[i].memberstate) + << " weight=" << gdata[i].weight; + if (gdata[i].memberstate == SRT_GST_RUNNING) + { + cout << " (main) "; + mane = &gdata[i]; + } + else + { + cout << " (backup) "; + backup = &gdata[i]; + } + } + if (backup == nullptr) + { + if (--repeat_check) + { + cout << "BACKUP STILL RUNNING. AGAIN\n"; + this_thread::sleep_for(milliseconds(250)); + goto CheckLinksAgain; + } + } + cout << endl; + + EXPECT_NE(mane, nullptr); + EXPECT_NE(backup, nullptr); + EXPECT_EQ(mane->weight, 1); + EXPECT_EQ(backup->weight, 0); + + cout << "MAIN (expected active):[" << mane->token << "] weight=" << mane->weight << endl; + cout << "BACKUP (expected idle):[" << backup->token << "] weight=" << backup->weight << endl; + + // Ok, now both links should be running (this state lasts + // for the "temporary activation" period. + EXPECT_EQ(mane->memberstate, SRT_GST_RUNNING); + EXPECT_EQ(backup->memberstate, SRT_GST_IDLE); + + this_thread::sleep_for(seconds(1)); + + cout << "Closing receiver thread [A]\n"; + + acthr.join(); + + srt_close(ss); +} + + From b25263783d2abd266be71a31b2ed15e10e4795f0 Mon Sep 17 00:00:00 2001 From: Sektor van Skijlen Date: Wed, 18 Dec 2024 17:02:07 +0100 Subject: [PATCH 2/4] [core] Make field for bandwidth atomic to avoid thread clash (#3093). --- srtcore/congctl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srtcore/congctl.cpp b/srtcore/congctl.cpp index 85d4cda97..9bc43db8b 100644 --- a/srtcore/congctl.cpp +++ b/srtcore/congctl.cpp @@ -60,7 +60,7 @@ void SrtCongestion::Check() class LiveCC: public SrtCongestionControlBase { - int64_t m_llSndMaxBW; //Max bandwidth (bytes/sec) + srt::sync::atomic m_llSndMaxBW; //Max bandwidth (bytes/sec) srt::sync::atomic m_zSndAvgPayloadSize; //Average Payload Size of packets to xmit size_t m_zMaxPayloadSize; size_t m_zHeaderSize; From 1ec61fc04db843ead4bebcb9db90cef8fbdd96d1 Mon Sep 17 00:00:00 2001 From: Sektor van Skijlen Date: Wed, 18 Dec 2024 17:04:23 +0100 Subject: [PATCH 3/4] [core] Apply conditional to setting a macro for IPv6 on Mac (#3092). Define __APPLE_USE_RFC_3542 if not defined. --- srtcore/platform_sys.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/srtcore/platform_sys.h b/srtcore/platform_sys.h index 83763e5ea..5dd2b928d 100644 --- a/srtcore/platform_sys.h +++ b/srtcore/platform_sys.h @@ -68,7 +68,9 @@ // also other macros, like TARGET_OS_IOS etc. #include "TargetConditionals.h" +#ifndef __APPLE_USE_RFC_3542 #define __APPLE_USE_RFC_3542 /* IPV6_PKTINFO */ +#endif #ifdef SRT_IMPORT_TIME #include From 8a89a3abbf4d3a2f7869d535349a474607ea0214 Mon Sep 17 00:00:00 2001 From: Sektor van Skijlen Date: Tue, 24 Dec 2024 10:41:40 +0100 Subject: [PATCH 4/4] [core][tests] Clarified conditions for setting IPTOS/TTL options. Added UT (#3096). * Added requirement for IPv6 for UDP Options test. * Updated documentation. --- CMakeLists.txt | 1 + docs/API/API-socket-options.md | 4 + srtcore/channel.cpp | 143 +++++++++++++++++++++++---------- test/test_env.h | 10 +++ test/test_reuseaddr.cpp | 23 ++++++ 5 files changed, 137 insertions(+), 44 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7ed11ef8e..981440189 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1526,6 +1526,7 @@ if (ENABLE_UNITTESTS AND ENABLE_CXX11) srt_add_program_dont_install(test-srt ${SOURCES_unittests}) srt_make_application(test-srt) target_include_directories(test-srt PRIVATE ${SSL_INCLUDE_DIRS} ${GTEST_INCLUDE_DIRS}) + target_compile_definitions(test-srt PRIVATE "-DSRT_TEST_SYSTEM_NAME=\"${CMAKE_SYSTEM_NAME}\"") target_link_libraries( test-srt diff --git a/docs/API/API-socket-options.md b/docs/API/API-socket-options.md index 3291f6e27..fed2a5b5a 100644 --- a/docs/API/API-socket-options.md +++ b/docs/API/API-socket-options.md @@ -614,6 +614,8 @@ See [`SRTO_INPUTBW`](#SRTO_INPUTBW). IPv4 Type of Service (see `IP_TOS` option for IP) or IPv6 Traffic Class (see `IPV6_TCLASS` of IPv6) depending on socket address family. Applies to sender only. +NOTE: This option has been tested to work correctly on Linux only. + When *getting*, the returned value is the user preset for non-connected sockets and the actual value for connected sockets. @@ -632,6 +634,8 @@ and the actual value for connected sockets. IPv4 Time To Live (see `IP_TTL` option for IP) or IPv6 unicast hops (see `IPV6_UNICAST_HOPS` for IPv6) depending on socket address family. Applies to sender only. +NOTE: This option has been tested to work correctly on Linux only. + When *getting*, the returned value is the user preset for non-connected sockets and the actual value for connected sockets. diff --git a/srtcore/channel.cpp b/srtcore/channel.cpp index 9967d7daf..0ec8f090f 100644 --- a/srtcore/channel.cpp +++ b/srtcore/channel.cpp @@ -289,6 +289,20 @@ void srt::CChannel::attach(UDPSOCKET udpsock, const sockaddr_any& udpsocks_addr) setUDPSockOpt(); } +static inline string fmt_opt(bool value, const string& label) +{ + string out; + out.reserve(label.size() + 2); + out = value ? "+" : "-"; + out += label; + return out; +} + +static inline string fmt_alt(bool value, const string& label, const string& unlabel) +{ + return value ? label : unlabel; +} + void srt::CChannel::setUDPSockOpt() { #if defined(SUNOS) @@ -297,13 +311,13 @@ void srt::CChannel::setUDPSockOpt() // Retrieve starting SND/RCV Buffer sizes. int startRCVBUF = 0; optSize = sizeof(startRCVBUF); - if (0 != ::getsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (void*)&startRCVBUF, &optSize)) + if (-1 == ::getsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (void*)&startRCVBUF, &optSize)) { startRCVBUF = -1; } int startSNDBUF = 0; optSize = sizeof(startSNDBUF); - if (0 != ::getsockopt(m_iSocket, SOL_SOCKET, SO_SNDBUF, (void*)&startSNDBUF, &optSize)) + if (-1 == ::getsockopt(m_iSocket, SOL_SOCKET, SO_SNDBUF, (void*)&startSNDBUF, &optSize)) { startSNDBUF = -1; } @@ -312,13 +326,12 @@ void srt::CChannel::setUDPSockOpt() // maximum value. // However, do not reduce the buffer size. const int maxsize = 64000; - if (0 != - ::setsockopt( - m_iSocket, SOL_SOCKET, SO_RCVBUF, (const char*)&m_mcfg.iUDPRcvBufSize, sizeof m_mcfg.iUDPRcvBufSize)) + if (-1 == ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, + (const char*)&m_mcfg.iUDPRcvBufSize, sizeof m_mcfg.iUDPRcvBufSize)) { int currentRCVBUF = 0; optSize = sizeof(currentRCVBUF); - if (0 != ::getsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (void*)¤tRCVBUF, &optSize)) + if (-1 == ::getsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (void*)¤tRCVBUF, &optSize)) { currentRCVBUF = -1; } @@ -327,13 +340,12 @@ void srt::CChannel::setUDPSockOpt() ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (const char*)&maxsize, sizeof maxsize); } } - if (0 != - ::setsockopt( - m_iSocket, SOL_SOCKET, SO_SNDBUF, (const char*)&m_mcfg.iUDPSndBufSize, sizeof m_mcfg.iUDPSndBufSize)) + if (-1 == ::setsockopt(m_iSocket, SOL_SOCKET, SO_SNDBUF, + (const char*)&m_mcfg.iUDPSndBufSize, sizeof m_mcfg.iUDPSndBufSize)) { int currentSNDBUF = 0; optSize = sizeof(currentSNDBUF); - if (0 != ::getsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (void*)¤tSNDBUF, &optSize)) + if (-1 == ::getsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (void*)¤tSNDBUF, &optSize)) { currentSNDBUF = -1; } @@ -346,13 +358,13 @@ void srt::CChannel::setUDPSockOpt() // Retrieve ending SND/RCV Buffer sizes. int endRCVBUF = 0; optSize = sizeof(endRCVBUF); - if (0 != ::getsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (void*)&endRCVBUF, &optSize)) + if (-1 == ::getsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (void*)&endRCVBUF, &optSize)) { endRCVBUF = -1; } int endSNDBUF = 0; optSize = sizeof(endSNDBUF); - if (0 != ::getsockopt(m_iSocket, SOL_SOCKET, SO_SNDBUF, (void*)&endSNDBUF, &optSize)) + if (-1 == ::getsockopt(m_iSocket, SOL_SOCKET, SO_SNDBUF, (void*)&endSNDBUF, &optSize)) { endSNDBUF = -1; } @@ -368,90 +380,137 @@ void srt::CChannel::setUDPSockOpt() #elif defined(BSD) || TARGET_OS_MAC // BSD system will fail setsockopt if the requested buffer size exceeds system maximum value int maxsize = 64000; - if (0 != ::setsockopt( + if (-1 == ::setsockopt( m_iSocket, SOL_SOCKET, SO_RCVBUF, (const char*)&m_mcfg.iUDPRcvBufSize, sizeof m_mcfg.iUDPRcvBufSize)) ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, (const char*)&maxsize, sizeof maxsize); - if (0 != ::setsockopt( + if (-1 == ::setsockopt( m_iSocket, SOL_SOCKET, SO_SNDBUF, (const char*)&m_mcfg.iUDPSndBufSize, sizeof m_mcfg.iUDPSndBufSize)) ::setsockopt(m_iSocket, SOL_SOCKET, SO_SNDBUF, (const char*)&maxsize, sizeof maxsize); #else // for other systems, if requested is greated than maximum, the maximum value will be automactally used - if ((0 != - ::setsockopt( - m_iSocket, SOL_SOCKET, SO_RCVBUF, (const char*)&m_mcfg.iUDPRcvBufSize, sizeof m_mcfg.iUDPRcvBufSize)) || - (0 != ::setsockopt( - m_iSocket, SOL_SOCKET, SO_SNDBUF, (const char*)&m_mcfg.iUDPSndBufSize, sizeof m_mcfg.iUDPSndBufSize))) + if ((-1 == ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVBUF, + (const char*)&m_mcfg.iUDPRcvBufSize, sizeof m_mcfg.iUDPRcvBufSize)) + || + (-1 == ::setsockopt( m_iSocket, SOL_SOCKET, SO_SNDBUF, + (const char*)&m_mcfg.iUDPSndBufSize, sizeof m_mcfg.iUDPSndBufSize))) throw CUDTException(MJ_SETUP, MN_NORES, NET_ERROR); #endif + bool is_set = false; + bool adr_unspec = false, adr_mapped = false, adr_v6 = false; + if (m_BindAddr.family() == AF_INET) + { + adr_unspec = m_BindAddr.isany(); + } + else + { + adr_unspec = IN6_IS_ADDR_UNSPECIFIED(&m_BindAddr.sin6.sin6_addr); + adr_mapped = IN6_IS_ADDR_V4MAPPED(&m_BindAddr.sin6.sin6_addr); + adr_v6 = true; + } + if (m_mcfg.iIpTTL != -1) { - if (m_BindAddr.family() == AF_INET) + if (!adr_v6) { - if (0 != ::setsockopt(m_iSocket, IPPROTO_IP, IP_TTL, (const char*)&m_mcfg.iIpTTL, sizeof m_mcfg.iIpTTL)) + if (-1 == ::setsockopt(m_iSocket, IPPROTO_IP, IP_TTL, (const char*)&m_mcfg.iIpTTL, sizeof m_mcfg.iIpTTL)) + { + LOGC(kmlog.Error, log << "setsockopt(IP_TTL): " << SysStrError(NET_ERROR)); throw CUDTException(MJ_SETUP, MN_NORES, NET_ERROR); + } + is_set = true; } else { // If IPv6 address is unspecified, set BOTH IP_TTL and IPV6_UNICAST_HOPS. // For specified IPv6 address, set IPV6_UNICAST_HOPS ONLY UNLESS it's an IPv4-mapped-IPv6 - if (IN6_IS_ADDR_UNSPECIFIED(&m_BindAddr.sin6.sin6_addr) || - !IN6_IS_ADDR_V4MAPPED(&m_BindAddr.sin6.sin6_addr)) + if (adr_unspec || !adr_mapped) { - if (0 != - ::setsockopt( - m_iSocket, IPPROTO_IPV6, IPV6_UNICAST_HOPS, (const char*)&m_mcfg.iIpTTL, sizeof m_mcfg.iIpTTL)) + if (-1 == ::setsockopt( m_iSocket, IPPROTO_IPV6, IPV6_UNICAST_HOPS, + (const char*)&m_mcfg.iIpTTL, sizeof m_mcfg.iIpTTL)) { + LOGC(kmlog.Error, log << "setsockopt(IPV6_UNICAST_HOPS): " << SysStrError(NET_ERROR)); throw CUDTException(MJ_SETUP, MN_NORES, NET_ERROR); } + is_set = true; } // For specified IPv6 address, set IP_TTL ONLY WHEN it's an IPv4-mapped-IPv6 - if (IN6_IS_ADDR_UNSPECIFIED(&m_BindAddr.sin6.sin6_addr) || IN6_IS_ADDR_V4MAPPED(&m_BindAddr.sin6.sin6_addr)) + if (!is_set) // adr_mapped (because adr_unspec was handled above) { - if (0 != ::setsockopt(m_iSocket, IPPROTO_IP, IP_TTL, (const char*)&m_mcfg.iIpTTL, sizeof m_mcfg.iIpTTL)) + if (-1 == ::setsockopt(m_iSocket, IPPROTO_IP, IP_TTL, (const char*)&m_mcfg.iIpTTL, sizeof m_mcfg.iIpTTL)) { + LOGC(kmlog.Error, log << "setsockopt(IP_TTL): " << SysStrError(NET_ERROR) + << fmt_alt(adr_unspec, " (v6 unspec)", " (v6 mapped v4)")); throw CUDTException(MJ_SETUP, MN_NORES, NET_ERROR); } + is_set = true; } } + + if (!is_set) + { + LOGC(kmlog.Error, log << "srt_setsockflag(SRTO_IPTTL): No suitable condition for adr=" << m_BindAddr.str() + << " : " << fmt_opt(adr_v6, "v6 ") << fmt_opt(adr_unspec, "unspec ") << fmt_opt(adr_mapped, "mapped")); + throw CUDTException(MJ_SETUP, MN_INVAL, 0); + } } + is_set = false; if (m_mcfg.iIpToS != -1) { if (m_BindAddr.family() == AF_INET) { - if (0 != ::setsockopt(m_iSocket, IPPROTO_IP, IP_TOS, (const char*)&m_mcfg.iIpToS, sizeof m_mcfg.iIpToS)) + if (-1 == ::setsockopt(m_iSocket, IPPROTO_IP, IP_TOS, (const char*)&m_mcfg.iIpToS, sizeof m_mcfg.iIpToS)) + { + LOGC(kmlog.Error, log << "setsockopt(IP_TOS): " << SysStrError(NET_ERROR)); throw CUDTException(MJ_SETUP, MN_NORES, NET_ERROR); + } + is_set = true; } else { // If IPv6 address is unspecified, set BOTH IP_TOS and IPV6_TCLASS. + SRT_ATR_UNUSED bool using_tclass = false; #ifdef IPV6_TCLASS + using_tclass = true; // For specified IPv6 address, set IPV6_TCLASS ONLY UNLESS it's an IPv4-mapped-IPv6 - if (IN6_IS_ADDR_UNSPECIFIED(&m_BindAddr.sin6.sin6_addr) || - !IN6_IS_ADDR_V4MAPPED(&m_BindAddr.sin6.sin6_addr)) + if (adr_unspec || !adr_mapped) { - if (0 != ::setsockopt( - m_iSocket, IPPROTO_IPV6, IPV6_TCLASS, (const char*)&m_mcfg.iIpToS, sizeof m_mcfg.iIpToS)) + if (-1 == ::setsockopt(m_iSocket, IPPROTO_IPV6, IPV6_TCLASS, + (const char*)&m_mcfg.iIpToS, sizeof m_mcfg.iIpToS)) { + LOGC(kmlog.Error, log << "setsockopt(IPV6_TCLASS): " << SysStrError(NET_ERROR)); throw CUDTException(MJ_SETUP, MN_NORES, NET_ERROR); } + is_set = true; } #endif // For specified IPv6 address, set IP_TOS ONLY WHEN it's an IPv4-mapped-IPv6 - if (IN6_IS_ADDR_UNSPECIFIED(&m_BindAddr.sin6.sin6_addr) || IN6_IS_ADDR_V4MAPPED(&m_BindAddr.sin6.sin6_addr)) + if (!is_set && (adr_unspec || adr_mapped)) { - if (0 != ::setsockopt(m_iSocket, IPPROTO_IP, IP_TOS, (const char*)&m_mcfg.iIpToS, sizeof m_mcfg.iIpToS)) + if (-1 == ::setsockopt(m_iSocket, IPPROTO_IP, IP_TOS, (const char*)&m_mcfg.iIpToS, sizeof m_mcfg.iIpToS)) { + LOGC(kmlog.Error, log << "setsockopt(IP_TOS): " << SysStrError(NET_ERROR) + << (adr_unspec ? " (v6 unspecified)" : " (v6 mapped v4)") + << (using_tclass ? "(fallback to IP_TOS)" : "")); throw CUDTException(MJ_SETUP, MN_NORES, NET_ERROR); } + is_set = true; } } + + if (!is_set) + { + LOGC(kmlog.Error, log << "srt_setsockflag(SRTO_IPTOS): No suitable condition for adr=" << m_BindAddr.str() + << " : " << fmt_opt(adr_v6, "v6 ") << fmt_opt(adr_unspec, "unspec ") << fmt_opt(adr_mapped, "mapped")); + throw CUDTException(MJ_SETUP, MN_INVAL, 0); + } } + #ifdef SRT_ENABLE_BINDTODEVICE if (!m_mcfg.sBindToDevice.empty()) { @@ -461,14 +520,10 @@ void srt::CChannel::setUDPSockOpt() throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); } - if (0 != ::setsockopt( - m_iSocket, SOL_SOCKET, SO_BINDTODEVICE, m_mcfg.sBindToDevice.c_str(), m_mcfg.sBindToDevice.size())) + if (-1 == ::setsockopt(m_iSocket, SOL_SOCKET, SO_BINDTODEVICE, + m_mcfg.sBindToDevice.c_str(), m_mcfg.sBindToDevice.size())) { -#if ENABLE_LOGGING - char buf[255]; - const char* err = SysStrError(NET_ERROR, buf, 255); - LOGC(kmlog.Error, log << "setsockopt(SRTO_BINDTODEVICE): " << err); -#endif // ENABLE_LOGGING + LOGC(kmlog.Error, log << "setsockopt(SRTO_BINDTODEVICE): " << SysStrError(NET_ERROR)); throw CUDTException(MJ_SETUP, MN_NORES, NET_ERROR); } } @@ -482,7 +537,7 @@ void srt::CChannel::setUDPSockOpt() throw CUDTException(MJ_SETUP, MN_NORES, NET_ERROR); #elif defined(_WIN32) u_long nonBlocking = 1; - if (0 != ioctlsocket(m_iSocket, FIONBIO, &nonBlocking)) + if (-1 == ioctlsocket(m_iSocket, FIONBIO, &nonBlocking)) throw CUDTException(MJ_SETUP, MN_NORES, NET_ERROR); #else timeval tv; @@ -495,7 +550,7 @@ void srt::CChannel::setUDPSockOpt() tv.tv_usec = 100; #endif // Set receiving time-out value - if (0 != ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVTIMEO, (char*)&tv, sizeof(timeval))) + if (-1 == ::setsockopt(m_iSocket, SOL_SOCKET, SO_RCVTIMEO, (char*)&tv, sizeof(timeval))) throw CUDTException(MJ_SETUP, MN_NORES, NET_ERROR); #endif diff --git a/test/test_env.h b/test/test_env.h index b425fa808..5ca1a536e 100644 --- a/test/test_env.h +++ b/test/test_env.h @@ -40,6 +40,16 @@ class TestEnv: public testing::Environment // All must be static, return bool. Arguments allowed. // The name must start with Allowed_. static bool Allowed_IPv6(); + + template + static bool Allowed_Platform(const std::string& first, const Args&... follow) + { + if (first == SRT_TEST_SYSTEM_NAME) + return true; + return Allowed_Platform(follow...); + } + + static bool Allowed_Platform() { return false; } }; #define SRTST_REQUIRES(feature,...) if (!srt::TestEnv::Allowed_##feature(__VA_ARGS__)) { return; } diff --git a/test/test_reuseaddr.cpp b/test/test_reuseaddr.cpp index df4aec487..14789f897 100644 --- a/test/test_reuseaddr.cpp +++ b/test/test_reuseaddr.cpp @@ -531,6 +531,29 @@ TEST_F(ReuseAddr, DiffAddr) shutdownListener(bindsock_2); } +TEST_F(ReuseAddr, UDPOptions) +{ + // IP_TOS and IP_TTL don't work on Windows and Mac + SRTST_REQUIRES(Platform, "Linux", "GNU"); + + // Travis doesn't work with IPv6 + SRTST_REQUIRES(IPv6); + + MAKE_UNIQUE_SOCK(bs1, "general ipv6", prepareServerSocket()); + MAKE_UNIQUE_SOCK(bs2, "mapped ipv4", prepareServerSocket()); + + int val_TOS = 4; // IPTOS_RELIABILITY per , but not available on Windows + int val_TTL = 10; + + EXPECT_NE(srt_setsockflag(bs1, SRTO_IPTOS, &val_TOS, sizeof val_TOS), SRT_ERROR); + EXPECT_NE(srt_setsockflag(bs1, SRTO_IPTTL, &val_TTL, sizeof val_TTL), SRT_ERROR); + EXPECT_NE(srt_setsockflag(bs2, SRTO_IPTOS, &val_TOS, sizeof val_TOS), SRT_ERROR); + EXPECT_NE(srt_setsockflag(bs2, SRTO_IPTTL, &val_TTL, sizeof val_TTL), SRT_ERROR); + + bindSocket(bs1, "::1", 5000, true); + bindSocket(bs2, "::FFFF:127.0.0.1", 5001, true); +} + TEST_F(ReuseAddr, Wildcard) { #if defined(_WIN32) || defined(CYGWIN)