Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reformat with common coding style #90

Open
wants to merge 3 commits into
base: indigo-devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions nodelet/include/nodelet/detail/callback_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,34 +49,33 @@ namespace detail

class CallbackQueueManager;

class NODELETLIB_DECL CallbackQueue : public ros::CallbackQueueInterface,
public boost::enable_shared_from_this<CallbackQueue>
class NODELETLIB_DECL CallbackQueue :
public ros::CallbackQueueInterface,
public boost::enable_shared_from_this<CallbackQueue>
{
public:
CallbackQueue(CallbackQueueManager* parent,
const ros::VoidConstPtr& tracked_object = ros::VoidConstPtr());
~CallbackQueue();
CallbackQueue(CallbackQueueManager* parent, const ros::VoidConstPtr& tracked_object = ros::VoidConstPtr());
~CallbackQueue();

virtual void addCallback(const ros::CallbackInterfacePtr& callback, uint64_t owner_id = 0);
virtual void removeByID(uint64_t owner_id);
virtual void addCallback(const ros::CallbackInterfacePtr& callback, uint64_t owner_id = 0);
virtual void removeByID(uint64_t owner_id);

uint32_t callOne();
uint32_t callOne();

/**
/**
* \brief Enable the queue (queue is enabled by default)
*/
void enable();
/**
void enable();
/**
* \brief Disable the queue, meaning any calls to addCallback() will have no effect
*/
void disable();

void disable();

private:
CallbackQueueManager* parent_;
ros::CallbackQueue queue_;
ros::VoidConstWPtr tracked_object_;
bool has_tracked_object_;
CallbackQueueManager* parent_;
ros::CallbackQueue queue_;
ros::VoidConstWPtr tracked_object_;
bool has_tracked_object_;
};

} // namespace detail
Expand Down
163 changes: 83 additions & 80 deletions nodelet/include/nodelet/detail/callback_queue_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,101 +65,104 @@ typedef boost::shared_ptr<CallbackQueue> CallbackQueuePtr;
class NODELETLIB_DECL CallbackQueueManager
{
public:
/**
/**
* \brief Constructor
*
* By default, uses the number of hardware threads available on the current system.
*/
CallbackQueueManager(uint32_t num_worker_threads = 0);
~CallbackQueueManager();
CallbackQueueManager(uint32_t num_worker_threads = 0);
~CallbackQueueManager();

void addQueue(const CallbackQueuePtr& queue, bool threaded);
void removeQueue(const CallbackQueuePtr& queue);
void callbackAdded(const CallbackQueuePtr& queue);
void addQueue(const CallbackQueuePtr& queue, bool threaded);
void removeQueue(const CallbackQueuePtr& queue);
void callbackAdded(const CallbackQueuePtr& queue);

uint32_t getNumWorkerThreads();
uint32_t getNumWorkerThreads();

void stop();
void stop();

private:
void managerThread();
struct ThreadInfo;
void workerThread(ThreadInfo*);

ThreadInfo* getSmallestQueue();

struct QueueInfo
{
QueueInfo()
: threaded(false)
, thread_index(0xffffffff)
, in_thread(0)
{}

CallbackQueuePtr queue;
bool threaded;

// Only used if threaded == false
boost::mutex st_mutex;
/// @todo Could get rid of st_mutex by updating [thread_index|in_thread] atomically
uint32_t thread_index;
uint32_t in_thread;
};
typedef boost::shared_ptr<QueueInfo> QueueInfoPtr;

typedef boost::unordered_map<CallbackQueue*, QueueInfoPtr> M_Queue;
M_Queue queues_;
boost::mutex queues_mutex_;

/// @todo SRMW lockfree queue. waiting_mutex_ has the potential for a lot of contention
typedef std::vector<CallbackQueuePtr> V_Queue;
V_Queue waiting_;
boost::mutex waiting_mutex_;
boost::condition_variable waiting_cond_;
boost::thread_group tg_;

struct ThreadInfo
{
ThreadInfo()
: calling(0)
{}

/// @todo SRSW lockfree queue
boost::mutex queue_mutex;
boost::condition_variable queue_cond;
std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> > queue;
boost::detail::atomic_count calling;
void managerThread();
struct ThreadInfo;
void workerThread(ThreadInfo*);

#ifdef NODELET_QUEUE_DEBUG
struct Record
{
Record(double stamp, uint32_t tasks, bool threaded)
: stamp(stamp), tasks(tasks), threaded(threaded)
{}
ThreadInfo* getSmallestQueue();

double stamp;
uint32_t tasks;
bool threaded;
struct QueueInfo
{
QueueInfo()
: threaded(false)
, thread_index(0xffffffff)
, in_thread(0)
{
}

CallbackQueuePtr queue;
bool threaded;

// Only used if threaded == false
boost::mutex st_mutex;
/// @todo Could get rid of st_mutex by updating [thread_index|in_thread] atomically
uint32_t thread_index;
uint32_t in_thread;
};
typedef boost::shared_ptr<QueueInfo> QueueInfoPtr;

std::vector<Record> history;
typedef boost::unordered_map<CallbackQueue*, QueueInfoPtr> M_Queue;
M_Queue queues_;
boost::mutex queues_mutex_;

/// @todo SRMW lockfree queue. waiting_mutex_ has the potential for a lot of contention
typedef std::vector<CallbackQueuePtr> V_Queue;
V_Queue waiting_;
boost::mutex waiting_mutex_;
boost::condition_variable waiting_cond_;
boost::thread_group tg_;

struct ThreadInfo
{
ThreadInfo()
: calling(0)
{
}

/// @todo SRSW lockfree queue
boost::mutex queue_mutex;
boost::condition_variable queue_cond;
std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> > queue;
boost::detail::atomic_count calling;

#ifdef NODELET_QUEUE_DEBUG
struct Record
{
Record(double stamp, uint32_t tasks, bool threaded)
: stamp(stamp), tasks(tasks), threaded(threaded)
{
}

double stamp;
uint32_t tasks;
bool threaded;
};

std::vector<Record> history;
#endif

// Pad sizeof(ThreadInfo) to be a multiple of 64 (cache line size) to avoid false sharing.
// This still doesn't guarantee ThreadInfo is actually allocated on a cache line boundary though.
static const int ACTUAL_SIZE =
sizeof(boost::mutex) +
sizeof(boost::condition_variable) +
sizeof(std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >) +
sizeof(boost::detail::atomic_count);
uint8_t pad[((ACTUAL_SIZE + 63) & ~63) - ACTUAL_SIZE];
};
/// @todo Use cache-aligned allocator for thread_info_
typedef boost::scoped_array<ThreadInfo> V_ThreadInfo;
V_ThreadInfo thread_info_;

bool running_;
uint32_t num_worker_threads_;
// Pad sizeof(ThreadInfo) to be a multiple of 64 (cache line size) to avoid false sharing.
// This still doesn't guarantee ThreadInfo is actually allocated on a cache line boundary though.
static const int ACTUAL_SIZE =
sizeof(boost::mutex) +
sizeof(boost::condition_variable) +
sizeof(std::vector<std::pair<CallbackQueuePtr, QueueInfoPtr> >) +
sizeof(boost::detail::atomic_count);
uint8_t pad[((ACTUAL_SIZE + 63) & ~63) - ACTUAL_SIZE];
};
/// @todo Use cache-aligned allocator for thread_info_
typedef boost::scoped_array<ThreadInfo> V_ThreadInfo;
V_ThreadInfo thread_info_;

bool running_;
uint32_t num_worker_threads_;
};

} // namespace detail
Expand Down
9 changes: 4 additions & 5 deletions nodelet/include/nodelet/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,12 @@ namespace nodelet
class Exception : public std::runtime_error
{
public:
Exception(const std::string& what)
: std::runtime_error(what)
{}
Exception(const std::string& what)
: std::runtime_error(what)
{
}
};

} // namespace nodelet

#endif // NODELET_EXCEPTION_H


37 changes: 18 additions & 19 deletions nodelet/include/nodelet/loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,35 +62,34 @@ typedef std::vector<std::string> V_string;
class NODELETLIB_DECL Loader
{
public:
/** \brief Construct the nodelet loader with optional ros API at default location of NodeHandle("~")*/
Loader(bool provide_ros_api = true);
/** \brief Construct the nodelet loader with optional ros API in namespace of passed NodeHandle */
Loader(const ros::NodeHandle& server_nh);
/**
/** \brief Construct the nodelet loader with optional ros API at default location of NodeHandle("~")*/
Loader(bool provide_ros_api = true);
/** \brief Construct the nodelet loader with optional ros API in namespace of passed NodeHandle */
Loader(const ros::NodeHandle& server_nh);
/**
* \brief Construct the nodelet loader without ros API, using non-standard factory function to
* create nodelet instances
*/
Loader(const boost::function<boost::shared_ptr<Nodelet> (const std::string& lookup_name)>& create_instance);
Loader(const boost::function<boost::shared_ptr<Nodelet>(const std::string&)>& create_instance);

~Loader();
~Loader();

/** \brief Load a nodelet */
bool load(const std::string& name, const std::string& type, const M_string& remappings,
const V_string& my_argv);
/** \brief Load a nodelet */
bool load(const std::string& name, const std::string& type, const M_string& remappings, const V_string& my_argv);

/** \brief Unload a nodelet */
bool unload(const std::string& name);
/** \brief Unload a nodelet */
bool unload(const std::string& name);

/** \brief Clear all nodelets from this loader */
bool clear();
/** \brief Clear all nodelets from this loader */
bool clear();

/**\brief List the names of all loaded nodelets */
std::vector<std::string> listLoadedNodelets();
/**\brief List the names of all loaded nodelets */
std::vector<std::string> listLoadedNodelets();

private:
boost::mutex lock_; ///<! Public methods must lock this to preserve internal integrity.
struct Impl;
boost::scoped_ptr<Impl> impl_;
boost::mutex lock_; ///<! Public methods must lock this to preserve internal integrity.
struct Impl;
boost::scoped_ptr<Impl> impl_;
};

} // namespace nodelet
Expand Down
Loading