Skip to content

Commit

Permalink
Added group contents check and configurable sleep for group/listener …
Browse files Browse the repository at this point in the history
…tests
  • Loading branch information
Mikołaj Małecki committed Oct 30, 2023
1 parent 6eafd0f commit f91f75a
Showing 1 changed file with 83 additions and 8 deletions.
91 changes: 83 additions & 8 deletions test/test_epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,9 @@ TEST(CEPoll, ThreadedUpdate)

void testListenerReady(const bool LATE_CALL, size_t nmembers)
{
bool is_single = true;
bool want_sleep = !TestEnv::me->OptionPresent("nosleep");

sockaddr_in sa;
memset(&sa, 0, sizeof sa);
sa.sin_family = AF_INET;
Expand All @@ -564,6 +567,7 @@ void testListenerReady(const bool LATE_CALL, size_t nmembers)
caller_sock = srt_create_group(SRT_GTYPE_BROADCAST);
int on = 1;
EXPECT_NE(srt_setsockflag(server_sock, SRTO_GROUPCONNECT, &on, sizeof on), SRT_ERROR);
is_single = false;
}
else
{
Expand All @@ -588,16 +592,20 @@ void testListenerReady(const bool LATE_CALL, size_t nmembers)
// We don't need the caller to be async, it can hang up here.
for (size_t i = 0; i < nmembers; ++i)
{
connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa]() {
connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa, i]() {
std::cout << "[T:" << i << "] CALLING\n";
return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa));
}));
}

std::cout << "STARTED connecting...\n";
}

std::cout << "Sleeping 1s...\n";
this_thread::sleep_for(chrono::milliseconds(1000));
if (want_sleep)
{
std::cout << "Sleeping 1s...\n";
this_thread::sleep_for(chrono::milliseconds(1000));
}

// What is important is that the accepted socket is now reporting in
// on the listener socket. So let's create an epoll.
Expand All @@ -616,7 +624,8 @@ void testListenerReady(const bool LATE_CALL, size_t nmembers)
// We don't need the caller to be async, it can hang up here.
for (size_t i = 0; i < nmembers; ++i)
{
connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa]() {
connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa, i]() {
std::cout << "[T:" << i << "] CALLING\n";
return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa));
}));
}
Expand All @@ -640,24 +649,90 @@ void testListenerReady(const bool LATE_CALL, size_t nmembers)
std::cout << "With >1 members, check if there's still UPDATE pending\n";
// Spawn yet another connection within the group, just to get the update
auto extra_call = std::async(std::launch::async, [&caller_sock, &sa]() {
return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa));
});
std::cout << "[T:X] CALLING (expected failure)\n";
return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa));
});
// For 2+ members, additionally check if there AREN'T any
// further acceptance members, but there are UPDATEs.
EXPECT_EQ(srt_epoll_uwait(eid_postcheck, fdset, 1, 5000), 1);

// SUBSCRIBED EVENTS: IN, UPDATE.
// expected: UPDATE only.
EXPECT_EQ(fdset[0].events, SRT_EPOLL_UPDATE);
EXPECT_NE(extra_call.get(), SRT_INVALID_SOCK);
SRTSOCKET joined = extra_call.get();
EXPECT_NE(joined, SRT_INVALID_SOCK);
std::cout << Sprint("Extra joined: @", joined, "\n");
}

std::vector<SRT_SOCKGROUPDATA> gdata;

if (!is_single)
{
EXPECT_EQ(sock & SRTGROUP_MASK, SRTGROUP_MASK);
// +1 because we have added one more caller to check UPDATE event.
size_t inoutlen = nmembers+1;
gdata.resize(inoutlen);
int groupndata = srt_group_data(sock, gdata.data(), (&inoutlen));
EXPECT_NE(groupndata, SRT_ERROR);

std::ostringstream sout;
if (groupndata == SRT_ERROR)
sout << "ERROR: " << srt_getlasterror_str() << " OUTLEN: " << inoutlen << std::endl;
else
{
// Just to display the members
sout << "(Listener) Members: ";

for (int i = 0; i < groupndata; ++i)
sout << "@" << gdata[i].id << " ";
sout << std::endl;
}

std::cout << sout.str();
}

std::cout << "Joining connector thread(s)\n";
for (size_t i = 0; i < nmembers; ++i)
{
EXPECT_NE(connect_res[i].get(), SRT_INVALID_SOCK);
std::cout << "Join: #" << i << ":\n";
SRTSOCKET called_socket = connect_res[i].get();
std::cout << "... " << called_socket << std::endl;
EXPECT_NE(called_socket, SRT_INVALID_SOCK);
}

if (!is_single)
{
EXPECT_EQ(caller_sock & SRTGROUP_MASK, SRTGROUP_MASK);
// +1 because we have added one more caller to check UPDATE event.
size_t inoutlen = nmembers+1;
gdata.resize(inoutlen);
int groupndata = srt_group_data(caller_sock, gdata.data(), (&inoutlen));
EXPECT_NE(groupndata, SRT_ERROR);

std::ostringstream sout;
if (groupndata == SRT_ERROR)
sout << "ERROR: " << srt_getlasterror_str() << " OUTLEN: " << inoutlen << std::endl;
else
{
// Just to display the members
sout << "(Caller) Members: ";

for (int i = 0; i < groupndata; ++i)
sout << "@" << gdata[i].id << " ";
sout << std::endl;
}

std::cout << sout.str();

if (want_sleep)
{
std::cout << "Sleep for 3 seconds to avoid closing-in-between\n";
std::this_thread::sleep_for(std::chrono::seconds(3));
}
}

std::cout << "Releasing EID resources and all sockets\n";

srt_epoll_release(eid);
srt_epoll_release(eid_postcheck);

Expand Down

0 comments on commit f91f75a

Please sign in to comment.