Skip to content

Commit

Permalink
[core] Fixed the problem of getting option values from groups
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikołaj Małecki committed Feb 22, 2024
1 parent 16cb043 commit 2939a2b
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 32 deletions.
2 changes: 1 addition & 1 deletion srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1394,7 +1394,7 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i
for (size_t i = 0; i < g.m_config.size(); ++i)
{
HLOGC(aclog.Debug, log << "groupConnect: OPTION @" << sid << " #" << g.m_config[i].so);
error_reason = "setting group-derived option: #" + Sprint(g.m_config[i].so);
error_reason = "group-derived option: #" + Sprint(g.m_config[i].so);
ns->core().setOpt(g.m_config[i].so, &g.m_config[i].value[0], (int)g.m_config[i].value.size());
}

Expand Down
86 changes: 55 additions & 31 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,16 @@ static bool getOptDefault(SRT_SOCKOPT optname, void* pw_optval, int& w_optlen)
return true;
}

struct FOptionValue
{
SRT_SOCKOPT expected;
FOptionValue(SRT_SOCKOPT v): expected(v) {}
bool operator()(const CUDTGroup::ConfigItem& i) const
{
return i.so == expected;
}
};

void CUDTGroup::getOpt(SRT_SOCKOPT optname, void* pw_optval, int& w_optlen)
{
// Options handled in group
Expand All @@ -732,46 +742,60 @@ void CUDTGroup::getOpt(SRT_SOCKOPT optname, void* pw_optval, int& w_optlen)
w_optlen = sizeof(bool);
return;

default:; // pass on
}
case SRTO_SNDTIMEO:
*(int*)pw_optval = m_iSndTimeOut;
w_optlen = sizeof(int);
return;

// XXX Suspicous: may require locking of GlobControlLock
// to prevent from deleting a socket in the meantime.
// Deleting a socket requires removing from the group first,
// so after GroupLock this will be either already NULL or
// a valid socket that will only be closed after time in
// the GC, so this is likely safe like all other API functions.
CUDTSocket* ps = 0;
case SRTO_RCVTIMEO:
*(int*)pw_optval = m_iRcvTimeOut;
w_optlen = sizeof(int);
return;

{
// In sockets. All sockets should have all options
// set the same and should represent the group state
// well enough. If there are no sockets, just use default.
case SRTO_GROUPMINSTABLETIMEO:
*(uint32_t*)pw_optval = m_uOPT_MinStabilityTimeout_us / 1000;
w_optlen = sizeof(uint32_t);
return;

// Group lock to protect the container itself.
// Once a socket is extracted, we state it cannot be
// closed without the group send/recv function or closing
// being involved.
ScopedLock lg(m_GroupLock);
if (m_Group.empty())
{
if (!getOptDefault(optname, (pw_optval), (w_optlen)))
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
// Write-only options for security reasons or
// options that refer to a socket state, that
// makes no sense for a group.
case SRTO_PASSPHRASE:
case SRTO_KMSTATE:
case SRTO_PBKEYLEN:
case SRTO_KMPREANNOUNCE:
case SRTO_KMREFRESHRATE:
case SRTO_BINDTODEVICE:
case SRTO_GROUPCONNECT:
case SRTO_STATE:
case SRTO_EVENT:
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);

return;
}
default:; // pass on
}

// Check if the option is in the storage, which means that
// it was modified on the group.

ps = m_Group.begin()->ps;
vector<ConfigItem>::const_iterator i = find_if(m_config.begin(), m_config.end(),
FOptionValue(optname));

// Release the lock on the group, as it's not necessary,
// as well as it might cause a deadlock when combined
// with the others.
if (i == m_config.end())
{
// Not found, see the defaults
if (!getOptDefault(optname, (pw_optval), (w_optlen)))
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);

return;
}

if (!ps)
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
// Found, return the value from the storage.
// Check the size first.
if (w_optlen < int(i->value.size()))
throw CUDTException(MJ_NOTSUP, MN_XSIZE, 0);

return ps->core().getOpt(optname, (pw_optval), (w_optlen));
w_optlen = i->value.size();
memcpy((pw_optval), &i->value[0], i->value.size());
}

SRT_SOCKSTATUS CUDTGroup::getStatus()
Expand Down
127 changes: 127 additions & 0 deletions test/test_bonding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,130 @@ TEST(Bonding, CloseGroupAndSocket)
listen_promise.wait();
}

TEST(Bonding, Options)
{
using namespace std;
using namespace srt;

TestInit srtinit;

// Create a group
const SRTSOCKET grp = srt_create_group(SRT_GTYPE_BROADCAST);

// rendezvous shall not be allowed to be set on the group
// XXX actually it is possible, but no one tested it. POSTPONE.
//int yes = 1;
//EXPECT_EQ(srt_setsockflag(grp, SRTO_RENDEZVOUS, &yes, sizeof yes), SRT_ERROR);

#ifdef SRT_ENABLE_ENCRYPTION
string pass = "longenoughpassword";
// passphrase should be ok.
EXPECT_NE(srt_setsockflag(grp, SRTO_PASSPHRASE, pass.c_str(), pass.size()), SRT_ERROR);
#endif

int lat = 500;
EXPECT_NE(srt_setsockflag(grp, SRTO_RCVLATENCY, &lat, sizeof lat), SRT_ERROR);

mutex mx;
condition_variable latch;
atomic<bool> started {false};

thread accept_and_close { [&]() {

unique_lock<mutex> ux(mx);

SRTSOCKET lsn = srt_create_socket();
#ifdef SRT_ENABLE_ENCRYPTION
EXPECT_NE(srt_setsockflag(lsn, SRTO_PASSPHRASE, pass.c_str(), pass.size()), SRT_ERROR);
#endif
int allow = 1;
ASSERT_NE(srt_setsockflag(lsn, SRTO_GROUPCONNECT, &allow, sizeof allow), SRT_ERROR);
sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET);
ASSERT_NE(srt_bind(lsn, sa.get(), sa.size()), SRT_ERROR);
ASSERT_NE(srt_listen(lsn, 1), SRT_ERROR);
started = true;

// First wait - until it's let go with accepting
latch.wait(ux);

sockaddr_any revsa;
SRTSOCKET gs = srt_accept(lsn, revsa.get(), &revsa.len);
ASSERT_NE(gs, SRT_ERROR);

// Connected, wait to close
latch.wait(ux);

srt_close(gs);
srt_close(lsn);
}};

// Give the thread a chance to start
this_thread::yield();

while (!started)
{
// In case of a bad luck, just wait for the thread to
// acquire the mutex before you do
this_thread::sleep_for(chrono::milliseconds(10));
}

// Wait for the possibility to connect
{
// Make sure that the thread reached the wait() call.
unique_lock<mutex> ux(mx);
latch.notify_all();
}

// Now the thread is accepting, so we call the connect.
sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET);
SRTSOCKET member = srt_connect(grp, sa.get(), sa.size());

// We've released the mutex and signaled the CV, so accept should proceed now.
// Exit from srt_connect() means also exit from srt_accept().

EXPECT_NE(member, SRT_INVALID_SOCK);

// conenct_res should be a socket
EXPECT_NE(member, 0); // XXX Change to SRT_SOCKID_CONNREQ

// Now get the option value from the group

int revlat = -1;
int optsize = sizeof revlat;
EXPECT_NE(srt_getsockflag(grp, SRTO_RCVLATENCY, &revlat, &optsize), SRT_ERROR);
EXPECT_EQ(optsize, sizeof revlat);
EXPECT_EQ(revlat, 500);

revlat = -1;
optsize = sizeof revlat;
// Expect the same value set on the member socket
EXPECT_NE(srt_getsockflag(member, SRTO_RCVLATENCY, &revlat, &optsize), SRT_ERROR);
EXPECT_EQ(optsize, sizeof revlat);
EXPECT_EQ(revlat, 500);

// Individual socket option modified on group
int ohead = 12;
optsize = sizeof ohead;
EXPECT_NE(srt_setsockflag(grp, SRTO_OHEADBW, &ohead, optsize), SRT_ERROR);

// Modifyting a post-option should be possible on a socket
ohead = 11;
optsize = sizeof ohead;
EXPECT_NE(srt_setsockflag(member, SRTO_OHEADBW, &ohead, optsize), SRT_ERROR);

// But getting the option value should be equal to the group setting
EXPECT_NE(srt_getsockflag(grp, SRTO_OHEADBW, &ohead, &optsize), SRT_ERROR);
EXPECT_EQ(optsize, sizeof ohead);
EXPECT_EQ(ohead, 12);

// We're done, the thread can close connection and exit
{
// Make sure that the thread reached the wait() call.
std::unique_lock<std::mutex> ux(mx);
latch.notify_all();
}

accept_and_close.join();
srt_close(grp);
}

0 comments on commit 2939a2b

Please sign in to comment.