Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fixed the problem of getting option values from groups #2891

Merged
merged 1 commit into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1394,7 +1394,7 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i
for (size_t i = 0; i < g.m_config.size(); ++i)
{
HLOGC(aclog.Debug, log << "groupConnect: OPTION @" << sid << " #" << g.m_config[i].so);
error_reason = "setting group-derived option: #" + Sprint(g.m_config[i].so);
error_reason = "group-derived option: #" + Sprint(g.m_config[i].so);
ns->core().setOpt(g.m_config[i].so, &g.m_config[i].value[0], (int)g.m_config[i].value.size());
}

Expand Down
86 changes: 55 additions & 31 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,16 @@ static bool getOptDefault(SRT_SOCKOPT optname, void* pw_optval, int& w_optlen)
return true;
}

struct FOptionValue
{
SRT_SOCKOPT expected;
FOptionValue(SRT_SOCKOPT v): expected(v) {}
bool operator()(const CUDTGroup::ConfigItem& i) const
{
return i.so == expected;
}
};

void CUDTGroup::getOpt(SRT_SOCKOPT optname, void* pw_optval, int& w_optlen)
{
// Options handled in group
Expand All @@ -732,46 +742,60 @@ void CUDTGroup::getOpt(SRT_SOCKOPT optname, void* pw_optval, int& w_optlen)
w_optlen = sizeof(bool);
return;

default:; // pass on
}
case SRTO_SNDTIMEO:
*(int*)pw_optval = m_iSndTimeOut;
w_optlen = sizeof(int);
return;

// XXX Suspicous: may require locking of GlobControlLock
// to prevent from deleting a socket in the meantime.
// Deleting a socket requires removing from the group first,
// so after GroupLock this will be either already NULL or
// a valid socket that will only be closed after time in
// the GC, so this is likely safe like all other API functions.
CUDTSocket* ps = 0;
case SRTO_RCVTIMEO:
*(int*)pw_optval = m_iRcvTimeOut;
w_optlen = sizeof(int);
return;

{
// In sockets. All sockets should have all options
// set the same and should represent the group state
// well enough. If there are no sockets, just use default.
case SRTO_GROUPMINSTABLETIMEO:
*(uint32_t*)pw_optval = m_uOPT_MinStabilityTimeout_us / 1000;
w_optlen = sizeof(uint32_t);
return;

// Group lock to protect the container itself.
// Once a socket is extracted, we state it cannot be
// closed without the group send/recv function or closing
// being involved.
ScopedLock lg(m_GroupLock);
if (m_Group.empty())
{
if (!getOptDefault(optname, (pw_optval), (w_optlen)))
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
// Write-only options for security reasons or
// options that refer to a socket state, that
// makes no sense for a group.
case SRTO_PASSPHRASE:
case SRTO_KMSTATE:
case SRTO_PBKEYLEN:
case SRTO_KMPREANNOUNCE:
case SRTO_KMREFRESHRATE:
case SRTO_BINDTODEVICE:
case SRTO_GROUPCONNECT:
case SRTO_STATE:
case SRTO_EVENT:
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);

return;
}
default:; // pass on
}

// Check if the option is in the storage, which means that
// it was modified on the group.

ps = m_Group.begin()->ps;
vector<ConfigItem>::const_iterator i = find_if(m_config.begin(), m_config.end(),
FOptionValue(optname));

// Release the lock on the group, as it's not necessary,
// as well as it might cause a deadlock when combined
// with the others.
if (i == m_config.end())
{
// Not found, see the defaults
if (!getOptDefault(optname, (pw_optval), (w_optlen)))
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);

return;
}

if (!ps)
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
// Found, return the value from the storage.
// Check the size first.
if (w_optlen < int(i->value.size()))
throw CUDTException(MJ_NOTSUP, MN_XSIZE, 0);

return ps->core().getOpt(optname, (pw_optval), (w_optlen));
w_optlen = i->value.size();
memcpy((pw_optval), &i->value[0], i->value.size());
}

SRT_SOCKSTATUS CUDTGroup::getStatus()
Expand Down
127 changes: 127 additions & 0 deletions test/test_bonding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,130 @@ TEST(Bonding, CloseGroupAndSocket)
listen_promise.wait();
}

TEST(Bonding, Options)
{
using namespace std;
using namespace srt;

TestInit srtinit;

// Create a group
const SRTSOCKET grp = srt_create_group(SRT_GTYPE_BROADCAST);

// rendezvous shall not be allowed to be set on the group
// XXX actually it is possible, but no one tested it. POSTPONE.
//int yes = 1;
//EXPECT_EQ(srt_setsockflag(grp, SRTO_RENDEZVOUS, &yes, sizeof yes), SRT_ERROR);

#ifdef SRT_ENABLE_ENCRYPTION
string pass = "longenoughpassword";
// passphrase should be ok.
EXPECT_NE(srt_setsockflag(grp, SRTO_PASSPHRASE, pass.c_str(), pass.size()), SRT_ERROR);
#endif

int lat = 500;
EXPECT_NE(srt_setsockflag(grp, SRTO_RCVLATENCY, &lat, sizeof lat), SRT_ERROR);

mutex mx;
condition_variable latch;
atomic<bool> started {false};

thread accept_and_close { [&]() {

unique_lock<mutex> ux(mx);

SRTSOCKET lsn = srt_create_socket();
#ifdef SRT_ENABLE_ENCRYPTION
EXPECT_NE(srt_setsockflag(lsn, SRTO_PASSPHRASE, pass.c_str(), pass.size()), SRT_ERROR);
#endif
int allow = 1;
ASSERT_NE(srt_setsockflag(lsn, SRTO_GROUPCONNECT, &allow, sizeof allow), SRT_ERROR);
sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET);
ASSERT_NE(srt_bind(lsn, sa.get(), sa.size()), SRT_ERROR);
ASSERT_NE(srt_listen(lsn, 1), SRT_ERROR);
started = true;

// First wait - until it's let go with accepting
latch.wait(ux);

sockaddr_any revsa;
SRTSOCKET gs = srt_accept(lsn, revsa.get(), &revsa.len);
ASSERT_NE(gs, SRT_ERROR);

// Connected, wait to close
latch.wait(ux);

srt_close(gs);
srt_close(lsn);
}};

// Give the thread a chance to start
this_thread::yield();

while (!started)
{
// In case of a bad luck, just wait for the thread to
// acquire the mutex before you do
this_thread::sleep_for(chrono::milliseconds(10));
}

// Wait for the possibility to connect
{
// Make sure that the thread reached the wait() call.
unique_lock<mutex> ux(mx);
latch.notify_all();
}

// Now the thread is accepting, so we call the connect.
sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET);
SRTSOCKET member = srt_connect(grp, sa.get(), sa.size());

// We've released the mutex and signaled the CV, so accept should proceed now.
// Exit from srt_connect() means also exit from srt_accept().

EXPECT_NE(member, SRT_INVALID_SOCK);

// conenct_res should be a socket
EXPECT_NE(member, 0); // XXX Change to SRT_SOCKID_CONNREQ

// Now get the option value from the group

int revlat = -1;
int optsize = sizeof revlat;
EXPECT_NE(srt_getsockflag(grp, SRTO_RCVLATENCY, &revlat, &optsize), SRT_ERROR);
EXPECT_EQ(optsize, sizeof revlat);
EXPECT_EQ(revlat, 500);

revlat = -1;
optsize = sizeof revlat;
// Expect the same value set on the member socket
EXPECT_NE(srt_getsockflag(member, SRTO_RCVLATENCY, &revlat, &optsize), SRT_ERROR);
EXPECT_EQ(optsize, sizeof revlat);
EXPECT_EQ(revlat, 500);

// Individual socket option modified on group
int ohead = 12;
optsize = sizeof ohead;
EXPECT_NE(srt_setsockflag(grp, SRTO_OHEADBW, &ohead, optsize), SRT_ERROR);

// Modifyting a post-option should be possible on a socket
ohead = 11;
optsize = sizeof ohead;
EXPECT_NE(srt_setsockflag(member, SRTO_OHEADBW, &ohead, optsize), SRT_ERROR);

// But getting the option value should be equal to the group setting
EXPECT_NE(srt_getsockflag(grp, SRTO_OHEADBW, &ohead, &optsize), SRT_ERROR);
EXPECT_EQ(optsize, sizeof ohead);
EXPECT_EQ(ohead, 12);

// We're done, the thread can close connection and exit
{
// Make sure that the thread reached the wait() call.
std::unique_lock<std::mutex> ux(mx);
latch.notify_all();
}

accept_and_close.join();
srt_close(grp);
}

Loading