Skip to content

Commit

Permalink
basic subscription works
Browse files Browse the repository at this point in the history
Former-commit-id: e498cc1
  • Loading branch information
jlblancoc committed Jul 31, 2020
1 parent 20c6adb commit 75256a2
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 12 deletions.
9 changes: 5 additions & 4 deletions cmake/mvsim-config.cmake.in
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
@PACKAGE_INIT@

find_package(mrpt-maps REQUIRED)
find_package(mrpt-gui REQUIRED)
#find_package(mrpt-tclap REQUIRED)
include(CMakeFindDependencyMacro)

include(${CMAKE_CURRENT_LIST_DIR}/@[email protected])
foreach(dep @PACKAGE_DEPENDENCIES@)
find_dependency(${dep})
endforeach()

include(${CMAKE_CURRENT_LIST_DIR}/@[email protected])

9 changes: 7 additions & 2 deletions modules/comms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ add_library(${PROJECT_NAME}

source_group(Comms FILES ${Comms_SRCS})

set(PACKAGE_DEPENDENCIES
mrpt-system
mrpt-io
mrpt-serialization
mvsim-msgs
)
mvsim_common_target_settings(${MODULE_NAME})

target_link_libraries(${PROJECT_NAME} PUBLIC mvsim::msgs)

target_link_libraries(${PROJECT_NAME}
PRIVATE
${CMAKE_THREAD_LIBS_INIT}
Expand All @@ -26,6 +30,7 @@ target_link_libraries(${PROJECT_NAME}
mrpt::system
mrpt::io
mrpt::serialization
mvsim::msgs
)

# =========== Python wrapper ==================
Expand Down
1 change: 1 addition & 0 deletions modules/comms/include/mvsim/Comms/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class Client : public mrpt::system::COutputLogger

void internalServiceServingThread();
void internalTopicUpdatesThread();
void internalTopicSubscribeThread(internal::InfoPerSubscribedTopic& ipt);

using service_callback_t =
std::function<std::shared_ptr<google::protobuf::Message>(
Expand Down
60 changes: 57 additions & 3 deletions modules/comms/src/Comms/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ void Client::connect()
#endif

// Create listening socket for subscription updates:
zmq_->topicNotificationsSocket.emplace(zmq_->context, ZMQ_REP);
zmq_->topicNotificationsSocket.emplace(zmq_->context, ZMQ_PAIR);
zmq_->topicNotificationsSocket->bind("tcp://0.0.0.0:*"s);

if (!zmq_->topicNotificationsSocket->connected())
Expand Down Expand Up @@ -384,7 +384,7 @@ void Client::doAdvertiseTopic(

// Retrieve assigned TCP port:
ipat.endpoint = get_zmq_endpoint(ipat.pubSocket);
ipat.topicName = topicName; // redundant in container, but handy.
ipat.topicName = topicName; // redundant in container, but handy.
ipat.descriptor = descriptor;

MRPT_LOG_DEBUG_FMT(
Expand Down Expand Up @@ -442,7 +442,7 @@ void Client::doAdvertiseService(
ZMQ_LAST_ENDPOINT, assignedPort, &assignedPortLen);
assignedPort[assignedPortLen] = '\0';

ips.serviceName = serviceName; // redundant in container, but handy.
ips.serviceName = serviceName; // redundant in container, but handy.
ips.callback = callback;
ips.descInput = descIn;
ips.descOutput = descOut;
Expand Down Expand Up @@ -750,6 +750,9 @@ void Client::doSubscribeTopic(

lck.unlock();

ipt.topicThread =
std::thread([&]() { this->internalTopicSubscribeThread(ipt); });

// Let the server know about our interest in the topic:
mvsim_msgs::SubscribeRequest subReq;
subReq.set_topic(topicName);
Expand All @@ -770,3 +773,54 @@ void Client::doSubscribeTopic(
#endif
MRPT_END
}

void Client::internalTopicSubscribeThread(internal::InfoPerSubscribedTopic& ipt)
{
using namespace std::string_literals;

#if defined(MVSIM_HAS_ZMQ) && defined(MVSIM_HAS_PROTOBUF)
try
{
MRPT_LOG_DEBUG_STREAM(
"[" << nodeName_ << "] Client topic subscribe thread for `"
<< ipt.topicName << "` started.");

zmq::socket_t& s = ipt.subSocket;

for (;;)
{
// Wait for next update from server:
zmq::message_t m = mvsim::receiveMessage(s);

// parse it:
std::cout << "RX topic " << ipt.topicName << " len:" << m.size()
<< "\n";
}
}
catch (const zmq::error_t& e)
{
if (e.num() == ETERM)
{
// This simply means someone called
// requestMainThreadTermination(). Just exit silently.
MRPT_LOG_INFO_STREAM(
"internalTopicSubscribeThread about to exit for ZMQ term "
"signal.");
}
else
{
MRPT_LOG_ERROR_STREAM(
"internalTopicSubscribeThread: ZMQ error: " << e.what());
}
}
catch (const std::exception& e)
{
MRPT_LOG_ERROR_STREAM(
"internalTopicSubscribeThread: Exception: "
<< mrpt::exception_to_str(e));
}
MRPT_LOG_DEBUG_STREAM(
"[" << nodeName_ << "] Client topic subscribe thread quitted.");

#endif
}
2 changes: 1 addition & 1 deletion modules/comms/src/Comms/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ void Server::db_add_topic_subscriber(
}

ASSERT_(mainThreadZMQcontext_);
zmq::socket_t s(*mainThreadZMQcontext_, ZMQ_REQ);
zmq::socket_t s(*mainThreadZMQcontext_, ZMQ_PAIR);
s.connect(updatesEndPoint);
ASSERT_(s.connected());
sendMessage(tiMsg, s);
Expand Down
1 change: 1 addition & 0 deletions modules/msgs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ source_group(ProtobufMessages FILES
${PROTO_PY_FILES}
${PROTOBUF_DEFINITION_FILES}
)
set(PACKAGE_DEPENDENCIES "")
mvsim_common_target_settings(${MODULE_NAME})

if (MVSIM_WITH_PROTOBUF)
Expand Down
8 changes: 6 additions & 2 deletions modules/simulator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ source_group(Friction FILES ${Friction_SRCS})
source_group(VehicleDynamics FILES ${VehicleDynamics_SRCS})
source_group(WorldElements FILES ${WorldElements_SRCS})

set(PACKAGE_DEPENDENCIES
mrpt-maps
mrpt-gui
mvsim-comms
)
mvsim_common_target_settings(${MODULE_NAME})


Expand All @@ -52,6 +57,7 @@ target_link_libraries(${PROJECT_NAME}
mrpt::opengl
mrpt::maps
mrpt::gui
mvsim::comms
${BOX2D_LIBRARIES}
)

Expand All @@ -61,5 +67,3 @@ target_include_directories(${PROJECT_NAME}
$<BUILD_INTERFACE:${mvsim_SOURCE_DIR}/externals/rapidxml>
)

target_link_libraries(${PROJECT_NAME} PUBLIC mvsim::comms)

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <mvsim/mvsim-msgs/Pose.pb.h>

#include <chrono>
#include <iostream>
#include <thread>

void myCallback(const mvsim_msgs::Pose& p)
Expand Down

0 comments on commit 75256a2

Please sign in to comment.